-
Notifications
You must be signed in to change notification settings - Fork 7.1k
[DataFrame] Implements filter and dropna #1959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
3d4ec01
implement filter
kunalgosar 1115453
begin implementation of dropna
kunalgosar e51a5f3
implement dropna
kunalgosar 83bd854
docs and tests
kunalgosar c0d8df4
resolving comments
kunalgosar f082a5d
resolving merge
kunalgosar 32ee1e1
add error checking to dropna
kunalgosar b94adb4
fix update inplace call
kunalgosar 16faf64
Implement multiple axis for dropna (#13)
SaladRaider 0d901c1
resolve comments
kunalgosar 69f9e29
fix lint
kunalgosar File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ | |
| from pandas._libs import lib | ||
| from pandas.core.dtypes.cast import maybe_upcast_putmask | ||
| from pandas import compat | ||
| from pandas.compat import lzip, string_types, cPickle as pkl | ||
| from pandas.compat import lzip, to_str, string_types, cPickle as pkl | ||
| import pandas.core.common as com | ||
| from pandas.core.dtypes.common import ( | ||
| is_bool_dtype, | ||
|
|
@@ -756,7 +756,8 @@ def transpose(self, *args, **kwargs): | |
|
|
||
| T = property(transpose) | ||
|
|
||
| def dropna(self, axis, how, thresh=None, subset=[], inplace=False): | ||
| def dropna(self, axis=0, how='any', thresh=None, subset=None, | ||
| inplace=False): | ||
| """Create a new DataFrame from the removed NA values from this one. | ||
|
|
||
| Args: | ||
|
|
@@ -774,7 +775,94 @@ def dropna(self, axis, how, thresh=None, subset=[], inplace=False): | |
| If inplace is set to True, returns None, otherwise returns a new | ||
| DataFrame with the dropna applied. | ||
| """ | ||
| raise NotImplementedError("Not yet") | ||
| inplace = validate_bool_kwarg(inplace, "inplace") | ||
|
|
||
| if is_list_like(axis): | ||
| axis = [pd.DataFrame()._get_axis_number(ax) for ax in axis] | ||
|
|
||
| result = self | ||
| # TODO(kunalgosar): this builds an intermediate dataframe, | ||
| # which does unnecessary computation | ||
| for ax in axis: | ||
| result = result.dropna( | ||
| axis=ax, how=how, thresh=thresh, subset=subset) | ||
| if not inplace: | ||
| return result | ||
|
|
||
| self._update_inplace(block_partitions=result._block_partitions, | ||
| columns=result.columns, | ||
| index=result.index) | ||
|
|
||
| return None | ||
|
|
||
| axis = pd.DataFrame()._get_axis_number(axis) | ||
|
|
||
| if how is not None and how not in ['any', 'all']: | ||
| raise ValueError('invalid how option: %s' % how) | ||
| if how is None and thresh is None: | ||
| raise TypeError('must specify how or thresh') | ||
|
|
||
| if subset is not None: | ||
| subset = set(subset) | ||
|
|
||
| if axis == 1: | ||
| subset = [item for item in self.index if item in subset] | ||
| else: | ||
| subset = [item for item in self.columns if item in subset] | ||
|
|
||
| def dropna_helper(df): | ||
| new_df = df.dropna(axis=axis, how=how, thresh=thresh, | ||
| subset=subset, inplace=False) | ||
|
|
||
| if axis == 1: | ||
| new_index = new_df.columns | ||
| new_df.columns = pd.RangeIndex(0, len(new_df.columns)) | ||
| else: | ||
| new_index = new_df.index | ||
| new_df.reset_index(drop=True, inplace=True) | ||
|
|
||
| return new_df, new_index | ||
|
|
||
| parts = self._col_partitions if axis == 1 else self._row_partitions | ||
| result = [_deploy_func._submit(args=(dropna_helper, df), | ||
| num_return_vals=2) for df in parts] | ||
| new_parts, new_vals = [list(t) for t in zip(*result)] | ||
|
|
||
| if axis == 1: | ||
| new_vals = [self._col_metadata.get_global_indices(i, vals) | ||
| for i, vals in enumerate(ray.get(new_vals))] | ||
|
|
||
| # This flattens the 2d array to 1d | ||
| new_vals = [i for j in new_vals for i in j] | ||
| new_cols = self.columns[new_vals] | ||
|
|
||
| if not inplace: | ||
| return DataFrame(col_partitions=new_parts, | ||
| columns=new_cols, | ||
| index=self.index) | ||
|
|
||
| self._update_inplace(col_partitions=new_parts, | ||
| columns=new_cols, | ||
| index=self.index) | ||
|
|
||
| else: | ||
| new_vals = [self._row_metadata.get_global_indices(i, vals) | ||
| for i, vals in enumerate(ray.get(new_vals))] | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here with the index.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above. |
||
|
|
||
| # This flattens the 2d array to 1d | ||
| new_vals = [i for j in new_vals for i in j] | ||
| new_rows = self.index[new_vals] | ||
|
|
||
| if not inplace: | ||
| return DataFrame(row_partitions=new_parts, | ||
| index=new_rows, | ||
| columns=self.columns) | ||
|
|
||
| self._update_inplace(row_partitions=new_parts, | ||
| index=new_rows, | ||
| columns=self.columns) | ||
|
|
||
| return None | ||
|
|
||
| def add(self, other, axis='columns', level=None, fill_value=None): | ||
| """Add this DataFrame to another or a scalar/list. | ||
|
|
@@ -1772,9 +1860,45 @@ def fillna(self, value=None, method=None, axis=None, inplace=False, | |
| return new_obj | ||
|
|
||
| def filter(self, items=None, like=None, regex=None, axis=None): | ||
| raise NotImplementedError( | ||
| "To contribute to Pandas on Ray, please visit " | ||
| "github.com/ray-project/ray.") | ||
| """Subset rows or columns based on their labels | ||
|
|
||
| Args: | ||
| items (list): list of labels to subset | ||
| like (string): retain labels where `arg in label == True` | ||
| regex (string): retain labels matching regex input | ||
| axis: axis to filter on | ||
|
|
||
| Returns: | ||
| A new dataframe with the filter applied. | ||
| """ | ||
| nkw = com._count_not_none(items, like, regex) | ||
| if nkw > 1: | ||
| raise TypeError('Keyword arguments `items`, `like`, or `regex` ' | ||
| 'are mutually exclusive') | ||
| if nkw == 0: | ||
| raise TypeError('Must pass either `items`, `like`, or `regex`') | ||
|
|
||
| if axis is None: | ||
| axis = 'columns' # This is the default info axis for dataframes | ||
|
|
||
| axis = pd.DataFrame()._get_axis_number(axis) | ||
| labels = self.columns if axis else self.index | ||
|
|
||
| if items is not None: | ||
| bool_arr = labels.isin(items) | ||
| elif like is not None: | ||
| def f(x): | ||
| return like in to_str(x) | ||
| bool_arr = labels.map(f).tolist() | ||
| else: | ||
| def f(x): | ||
| return matcher.search(to_str(x)) is not None | ||
| matcher = re.compile(regex) | ||
| bool_arr = labels.map(f).tolist() | ||
|
|
||
| if not axis: | ||
| return self[bool_arr] | ||
| return self[self.columns[bool_arr]] | ||
|
|
||
| def first(self, offset): | ||
| raise NotImplementedError( | ||
|
|
@@ -3957,7 +4081,9 @@ def _getitem_array(self, key): | |
| index=index) | ||
| else: | ||
| columns = self._col_metadata[key].index | ||
| indices_for_rows = [col for col in self.col if col in set(columns)] | ||
| indices_for_rows = \ | ||
| [i for i, item in enumerate(self.columns) | ||
| if item in set(columns)] | ||
|
|
||
| new_parts = [_deploy_func.remote( | ||
| lambda df: df.__getitem__(indices_for_rows), | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can build the columns in a remote task. It might be better
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I move this code to a remote task, it would require serializing
self._col_metadata._lengthsandself.columns, which could potentially be expensive. The code would later block on a ray.get on the new columns generated remotely.Given there would be a blocking operation either way, I think this is fine for now but can be revisited.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where the blocking operation occurs does matter. Here the main thread will be waiting until the entire
dropnaoperation is complete. This happens whether you need the columns immediately or not. We should block when the user needs it, not when the user calls an operation. Given this can be an iteratively executed operation, we should not block on each call.