-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[DataFrame] Impement sort_values and sort_index #1977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
e5289c5
bece269
02e6edf
7e8a9e3
a26b010
3bcad98
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3272,15 +3272,174 @@ def slice_shift(self, periods=1, axis=0): | |
| def sort_index(self, axis=0, level=None, ascending=True, inplace=False, | ||
| kind='quicksort', na_position='last', sort_remaining=True, | ||
| by=None): | ||
| raise NotImplementedError( | ||
| "To contribute to Pandas on Ray, please visit " | ||
| "github.com/ray-project/ray.") | ||
| """Sort a DataFrame by one of the indices (columns or index). | ||
|
|
||
| Args: | ||
| axis: The axis to sort over. | ||
| level: The MultiIndex level to sort over. | ||
| ascending: Ascending or descending | ||
| inplace: Whether or not to update this DataFrame inplace. | ||
| kind: How to perform the sort. | ||
| na_position: Where to position NA on the sort. | ||
| sort_remaining: On Multilevel Index sort based on all levels. | ||
| by: (Deprecated) argument to pass to sort_values. | ||
|
|
||
| Returns: | ||
| A sorted DataFrame | ||
| """ | ||
| if level is not None: | ||
| raise NotImplementedError("Multilevel index not yet implemented.") | ||
|
|
||
| if by is not None: | ||
| warnings.warn("by argument to sort_index is deprecated, " | ||
| "please use .sort_values(by=...)", | ||
| FutureWarning, stacklevel=2) | ||
| if level is not None: | ||
| raise ValueError("unable to simultaneously sort by and level") | ||
| return self.sort_values(by, axis=axis, ascending=ascending, | ||
| inplace=inplace) | ||
|
|
||
| axis = pd.DataFrame()._get_axis_number(axis) | ||
|
|
||
| args = (axis, level, ascending, False, kind, na_position, | ||
| sort_remaining) | ||
|
|
||
| def _sort_helper(df, index, axis, *args): | ||
| if axis == 0: | ||
| df.index = index | ||
| else: | ||
| df.columns = index | ||
|
|
||
| return df.sort_index(*args) | ||
|
|
||
| if axis == 0: | ||
| index = (self.index) | ||
|
||
| new_column_parts = _map_partitions( | ||
| lambda df: _sort_helper(df, index, axis, *args), | ||
| self._col_partitions) | ||
|
|
||
| new_columns = self.columns | ||
| new_index = self.index.sort_values() | ||
| new_row_parts = None | ||
| else: | ||
| columns = (self.columns) | ||
| new_row_parts = _map_partitions( | ||
| lambda df: _sort_helper(df, columns, axis, *args), | ||
| self._row_partitions) | ||
|
|
||
| new_columns = self.columns.sort_values() | ||
| new_index = self.index | ||
| new_column_parts = None | ||
|
|
||
| if not inplace: | ||
| return DataFrame(col_partitions=new_column_parts, | ||
| row_partitions=new_row_parts, | ||
| index=new_index, | ||
| columns=new_columns) | ||
| else: | ||
| self._update_inplace(row_partitions=new_row_parts, | ||
| col_partitions=new_column_parts, | ||
| columns=new_columns, | ||
| index=new_index) | ||
|
|
||
| def sort_values(self, by, axis=0, ascending=True, inplace=False, | ||
| kind='quicksort', na_position='last'): | ||
| raise NotImplementedError( | ||
| "To contribute to Pandas on Ray, please visit " | ||
| "github.com/ray-project/ray.") | ||
| """Sorts by a column/row or list of columns/rows. | ||
|
|
||
| Args: | ||
| by: A list of labels for the axis to sort over. | ||
| axis: The axis to sort. | ||
| ascending: Sort in ascending or descending order. | ||
| inplace: If true, do the operation inplace. | ||
| kind: How to sort. | ||
| na_position: Where to put np.nan values. | ||
|
|
||
| Returns: | ||
| A sorted DataFrame. | ||
| """ | ||
|
|
||
| axis = pd.DataFrame()._get_axis_number(axis) | ||
|
|
||
| if not is_list_like(by): | ||
| by = [by] | ||
|
|
||
| if axis == 0: | ||
| broadcast_value_dict = {str(col): self[col] for col in by} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be done as a single getitem call? You can pass in a list of column names.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It returns a ray DataFrame, so we'd have to |
||
| broadcast_values = pd.DataFrame(broadcast_value_dict) | ||
| else: | ||
| broadcast_value_list = [to_pandas(self[row::len(self.index)]) | ||
| for row in by] | ||
|
|
||
| index_builder = list(zip(broadcast_value_list, by)) | ||
|
|
||
| for row, idx in index_builder: | ||
| row.index = [str(idx)] | ||
|
|
||
| # Put this here to match the by below. | ||
| by = [str(col) for col in by] | ||
|
||
| broadcast_values = pd.concat([row for row, idx in index_builder]) | ||
|
|
||
| # We are converting the by to string here so that we don't have a | ||
| # collision with the RangeIndex on the inner frame. It is cheap and | ||
| # gaurantees that we sort by the correct column. | ||
| by = [str(col) for col in by] | ||
|
|
||
| args = (by, axis, ascending, False, kind, na_position) | ||
|
|
||
| def _sort_helper(df, broadcast_values, axis, *args): | ||
| """Sorts the data on a partition. | ||
|
|
||
| Args: | ||
| df: The DataFrame to sort. | ||
| broadcast_values: The by DataFrame to use for the sort. | ||
| axis: The axis to sort over. | ||
| args: The args for the sort. | ||
|
|
||
| Returns: | ||
| A new sorted DataFrame. | ||
| """ | ||
| if axis == 0: | ||
| broadcast_values.index = df.index | ||
| names = broadcast_values.columns | ||
| else: | ||
| broadcast_values.columns = df.columns | ||
| names = broadcast_values.index | ||
|
|
||
| return pd.concat([df, broadcast_values], axis=axis ^ 1, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately that is not always faster.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When it is slower it can be up to 3x slower, so to avoid that worst case we will leave it like this for now. |
||
| copy=False).sort_values(*args)\ | ||
| .drop(names, axis=axis ^ 1) | ||
|
|
||
| if axis == 0: | ||
| new_column_partitions = _map_partitions( | ||
| lambda df: _sort_helper(df, broadcast_values, axis, *args), | ||
| self._col_partitions) | ||
|
|
||
| new_row_partitions = None | ||
| new_columns = self.columns | ||
|
|
||
| # This is important because it allows us to get the axis that we | ||
| # aren't sorting over. We need the order of the columns/rows and | ||
| # this will provide that in the return value. | ||
| new_index = broadcast_values.sort_values(*args).index | ||
| else: | ||
| new_row_partitions = _map_partitions( | ||
| lambda df: _sort_helper(df, broadcast_values, axis, *args), | ||
| self._row_partitions) | ||
|
|
||
| new_column_partitions = None | ||
| new_columns = broadcast_values.sort_values(*args).columns | ||
| new_index = self.index | ||
|
|
||
| if inplace: | ||
| self._update_inplace(row_partitions=new_row_partitions, | ||
| col_partitions=new_column_partitions, | ||
| columns=new_columns, | ||
| index=new_index) | ||
| else: | ||
| return DataFrame(row_partitions=new_row_partitions, | ||
| col_partitions=new_column_partitions, | ||
| columns=new_columns, | ||
| index=new_index) | ||
|
|
||
| def sortlevel(self, level=0, axis=0, ascending=True, inplace=False, | ||
| sort_remaining=True): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -927,8 +927,6 @@ def test_append(): | |
|
|
||
| pandas_df2 = pd.DataFrame({"col5": [0], "col6": [1]}) | ||
|
|
||
| print(ray_df.append(ray_df2)) | ||
|
|
||
| assert ray_df_equals_pandas(ray_df.append(ray_df2), | ||
| pandas_df.append(pandas_df2)) | ||
|
|
||
|
|
@@ -2619,17 +2617,43 @@ def test_slice_shift(): | |
|
|
||
|
|
||
| def test_sort_index(): | ||
| ray_df = create_test_dataframe() | ||
| pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Share data between the two tests. |
||
| ray_df = rdf.DataFrame(pandas_df) | ||
|
|
||
| with pytest.raises(NotImplementedError): | ||
| ray_df.sort_index() | ||
| pandas_result = pandas_df.sort_index() | ||
| ray_result = ray_df.sort_index() | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
| pandas_result = pandas_df.sort_index(ascending=False) | ||
| ray_result = ray_df.sort_index(ascending=False) | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
|
|
||
| def test_sort_values(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a pytest fixture here, other tests can benefit from the large random dataframe being constructed.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At some point I think this should happen, but this probably isn't the PR to go through and make all these changes to the tests.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, this can be done in the future - separately. |
||
| ray_df = create_test_dataframe() | ||
| pandas_df = pd.DataFrame(np.random.randint(0, 100, size=(1000, 100))) | ||
| ray_df = rdf.DataFrame(pandas_df) | ||
|
|
||
| with pytest.raises(NotImplementedError): | ||
| ray_df.sort_values(None) | ||
| pandas_result = pandas_df.sort_values(by=1) | ||
| ray_result = ray_df.sort_values(by=1) | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
| pandas_result = pandas_df.sort_values(by=1, axis=1) | ||
| ray_result = ray_df.sort_values(by=1, axis=1) | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
| pandas_result = pandas_df.sort_values(by=[1, 3]) | ||
| ray_result = ray_df.sort_values(by=[1, 3]) | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
| pandas_result = pandas_df.sort_values(by=[1, 67], axis=1) | ||
| ray_result = ray_df.sort_values(by=[1, 67], axis=1) | ||
|
|
||
| ray_df_equals_pandas(ray_result, pandas_result) | ||
|
|
||
|
|
||
| def test_sortlevel(): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The index should be reset to a
RangeIndexafter this operation