diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index b96c4c836453..e8fd0b422540 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -71,7 +71,6 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, col_metadata (_IndexMetadata): Metadata for the new dataframe's columns """ - self._row_metadata = self._col_metadata = None # Check type of data and use appropriate constructor if data is not None or (col_partitions is None and @@ -97,9 +96,9 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, else: # created this invariant to make sure we never have to go into the # partitions to get the columns - assert columns is not None, \ - "Columns not defined, must define columns for internal " \ - "DataFrame creations" + assert columns is not None or col_metadata is not None, \ + "Columns not defined, must define columns or col_metadata " \ + "for internal DataFrame creations" if block_partitions is not None: # put in numpy array here to make accesses easier since it's 2D @@ -109,18 +108,18 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, if row_partitions is not None: axis = 0 partitions = row_partitions + axis_length = len(columns) if columns is not None else \ + len(col_metadata) elif col_partitions is not None: axis = 1 partitions = col_partitions + axis_length = None + # TODO: write explicit tests for "short and wide" + # column partitions self._block_partitions = \ _create_block_partitions(partitions, axis=axis, - length=len(columns)) - - if row_metadata is not None: - self._row_metadata = row_metadata.copy() - if col_metadata is not None: - self._col_metadata = col_metadata.copy() + length=axis_length) # Sometimes we only get a single column or row, which is # problematic for building blocks from the partitions, so we @@ -133,10 +132,19 @@ def __init__(self, data=None, index=None, columns=None, dtype=None, # Create the row and column index objects for using our partitioning. # If the objects haven't been inherited, then generate them - if self._row_metadata is None: + if row_metadata is not None: + self._row_metadata = row_metadata.copy() + if index is not None: + self.index = index + else: self._row_metadata = _IndexMetadata(self._block_partitions[:, 0], index=index, axis=0) - if self._col_metadata is None: + + if col_metadata is not None: + self._col_metadata = col_metadata.copy() + if columns is not None: + self.columns = columns + else: self._col_metadata = _IndexMetadata(self._block_partitions[0, :], index=columns, axis=1) @@ -518,7 +526,8 @@ def add_prefix(self, prefix): new_cols = self.columns.map(lambda x: str(prefix) + str(x)) return DataFrame(block_partitions=self._block_partitions, columns=new_cols, - index=self.index) + col_metadata=self._col_metadata, + row_metadata=self._row_metadata) def add_suffix(self, suffix): """Add a suffix to each of the column names. @@ -529,7 +538,8 @@ def add_suffix(self, suffix): new_cols = self.columns.map(lambda x: str(x) + str(suffix)) return DataFrame(block_partitions=self._block_partitions, columns=new_cols, - index=self.index) + col_metadata=self._col_metadata, + row_metadata=self._row_metadata) def applymap(self, func): """Apply a function to a DataFrame elementwise. @@ -546,8 +556,8 @@ def applymap(self, func): for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def copy(self, deep=True): """Creates a shallow copy of the DataFrame. @@ -659,8 +669,6 @@ def isna(self): lambda df: df.isna(), block) for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index, row_metadata=self._row_metadata, col_metadata=self._col_metadata) @@ -678,8 +686,8 @@ def isnull(self): for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def keys(self): """Get the info axis for the DataFrame. @@ -1173,13 +1181,13 @@ def _cumulative_helper(self, func, axis): if axis == 0: new_cols = _map_partitions(func, self._col_partitions) return DataFrame(col_partitions=new_cols, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) else: new_rows = _map_partitions(func, self._row_partitions) return DataFrame(row_partitions=new_rows, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def cummax(self, axis=None, skipna=True, *args, **kwargs): """Perform a cumulative maximum across the DataFrame. @@ -1869,7 +1877,7 @@ def head(self, n=5): index = self._row_metadata.index[:n] return DataFrame(col_partitions=new_dfs, - columns=self.columns, + col_metadata=self._col_metadata, index=index) def hist(self, data, column=None, by=None, grid=True, xlabelsize=None, @@ -2498,8 +2506,8 @@ def notna(self): lambda df: df.notna(), block) for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def notnull(self): """Perform notnull across the DataFrame. @@ -2516,8 +2524,8 @@ def notnull(self): for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def nsmallest(self, n, columns, keep='first'): raise NotImplementedError( @@ -2682,7 +2690,8 @@ def query_helper(df): if inplace: self._update_inplace(row_partitions=new_rows) else: - return DataFrame(row_partitions=new_rows, columns=self.columns) + return DataFrame(row_partitions=new_rows, + col_metadata=self._col_metadata) def radd(self, other, axis='columns', level=None, fill_value=None): return self.add(other, axis, level, fill_value) @@ -2939,8 +2948,8 @@ def round(self, decimals=0, *args, **kwargs): for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + row_metadata=self._row_metadata, + col_metadata=self._col_metadata) def rpow(self, other, axis='columns', level=None, fill_value=None): return self._single_df_op_helper( @@ -3372,7 +3381,7 @@ def tail(self, n=5): index = self._row_metadata.index[-n:] return DataFrame(col_partitions=new_dfs, - columns=self.columns, + col_metadata=self._col_metadata, index=index) def take(self, indices, axis=0, convert=None, is_copy=True, **kwargs): @@ -3764,8 +3773,8 @@ def _getitem_slice(self, key): index = self.index[key] return DataFrame(col_partitions=new_cols, - index=index, - columns=self.columns) + col_metadata=self._col_metadata, + index=index) def __getattr__(self, key): """After regular attribute access, looks up the name in the columns @@ -4073,8 +4082,8 @@ def __neg__(self): for block in self._block_partitions]) return DataFrame(block_partitions=new_block_partitions, - columns=self.columns, - index=self.index) + col_metadata=self._col_metadata, + row_metadata=self._row_metadata) def __sizeof__(self): raise NotImplementedError( diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 235809ec7a35..50ba251a8ff9 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -3,115 +3,27 @@ import ray from .utils import ( - _build_index, - _build_columns) + _build_row_lengths, + _build_col_widths, + _build_coord_df) from pandas.core.indexing import convert_to_index_sliceable -class _IndexMetadataBase(object): +class _IndexMetadata(object): """Wrapper for Pandas indexes in Ray DataFrames. Handles all of the metadata specific to the axis of partition (setting indexes, - calculating the index within partition of a value, etc.) since the - dataframe may be partitioned across either axis. This way we can unify the - possible index operations over one axis-agnostic interface. - - This class is the abstract superclass for IndexMetadata and - WrappingIndexMetadata, which handle indexes along the partitioned and - non-partitioned axes, respectively. + calculating the index within partition of a value, etc.). This + implementation assumes the underlying index lies across multiple + partitions. IMPORTANT NOTE: Currently all operations, as implemented, are inplace. - """ - - def _get__coord_df(self): - if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): - self._coord_df_cache = ray.get(self._coord_df_cache) - return self._coord_df_cache - - def _set__coord_df(self, coord_df): - self._coord_df_cache = coord_df - - _coord_df = property(_get__coord_df, _set__coord_df) - - def _get_index(self): - """Get the index wrapped by this IndexDF. - - Returns: - The index wrapped by this IndexDF - """ - return self._coord_df.index - - def _set_index(self, new_index): - """Set the index wrapped by this IndexDF. - - Args: - new_index: The new index to wrap - """ - self._coord_df.index = new_index - - index = property(_get_index, _set_index) - - def coords_of(self, key): - raise NotImplementedError() - - def __getitem__(self, key): - return self.coords_of(key) - - def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, - group_keys=True, squeeze=False, **kwargs): - raise NotImplementedError() - - def __len__(self): - return len(self._coord_df) - - def first_valid_index(self): - return self._coord_df.first_valid_index() - def last_valid_index(self): - return self._coord_df.last_valid_index() - - def insert(self, key, loc=None, partition=None, - index_within_partition=None): - raise NotImplementedError() - - def drop(self, labels, errors='raise'): - """Drop the specified labels from the IndexMetadata - - Args: - labels (scalar or list-like): - The labels to drop - errors ('raise' or 'ignore'): - If 'ignore', suppress errors for when labels don't exist - - Returns: - DataFrame with coordinates of dropped labels - """ - # TODO(patyang): This produces inconsistent indexes. - dropped = self.coords_of(labels) - self._coord_df = self._coord_df.drop(labels, errors=errors) - return dropped - - def rename_index(self, mapper): - """Rename the index. - - Args: - mapper: name to rename the index as - """ - self._coord_df = self._coord_df.rename_axis(mapper, axis=0) - - def convert_to_index_sliceable(self, key): - """Converts and performs error checking on the passed slice - - Args: - key: slice to convert and check - """ - return convert_to_index_sliceable(self._coord_df, key) - - -class _IndexMetadata(_IndexMetadataBase): - """IndexMetadata implementation for index across a partitioned axis. This - implementation assumes the underlying index lies across multiple - partitions. + WARNING: Currently, the `_lengths` item is the source of truth for an + _IndexMetadata object, since it is easy to manage, and that the coord_df + item may be deprecated in the future. As such, it is _very_ important that + any functions that mutate the coord_df splits in anyway first modify the + lengths. Otherwise bad things might happen! """ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, @@ -127,12 +39,20 @@ def __init__(self, dfs=None, index=None, axis=0, lengths_oid=None, A IndexMetadata backed by the specified pd.Index, partitioned off specified partitions """ - if dfs is not None: - lengths_oid, coord_df_oid = \ - _build_index.remote(dfs, index) if axis == 0 else \ - _build_columns.remote(dfs, index) - self._coord_df = coord_df_oid + assert (lengths_oid is None) == (coord_df_oid is None), \ + "Must pass both or neither of lengths_oid and coord_df_oid" + + if dfs is not None and lengths_oid is None: + if axis == 0: + lengths_oid = _build_row_lengths.remote(dfs) + else: + lengths_oid = _build_col_widths.remote(dfs) + coord_df_oid = _build_coord_df.remote(lengths_oid, index) + self._lengths = lengths_oid + self._coord_df = coord_df_oid + self._index_cache = index + self._cached_index = False def _get__lengths(self): if isinstance(self._lengths_cache, ray.local_scheduler.ObjectID) or \ @@ -146,6 +66,68 @@ def _set__lengths(self, lengths): _lengths = property(_get__lengths, _set__lengths) + def _get__coord_df(self): + """Get the coordinate dataframe wrapped by this _IndexMetadata. + + Since we may have had an index set before our coord_df was + materialized, we'll have to apply it to the newly materialized df + """ + if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + self._coord_df_cache = ray.get(self._coord_df_cache) + if self._cached_index: + self._coord_df_cache.index = self._index_cache + self._cached_index = False + return self._coord_df_cache + + def _set__coord_df(self, coord_df): + """Set the coordinate dataframe wrapped by this _IndexMetadata. + + Sometimes we set the _IndexMetadata's coord_df outside of the + constructor, generally using fxns like drop(). This produces a modified + index, so we need to reflect the change on the index cache. + + If the set _IndexMetadata is an OID instead (due to a copy or whatever + reason), we fall back relying on `_index_cache`. + """ + if not isinstance(coord_df, ray.local_scheduler.ObjectID): + self._index_cache = coord_df.index + self._coord_df_cache = coord_df + + _coord_df = property(_get__coord_df, _set__coord_df) + + def _get_index(self): + """Get the index wrapped by this _IndexMetadata. + + The only time `self._index_cache` would be None is in a newly created + _IndexMetadata object without a specified `index` parameter (See the + _IndexMetadata constructor for more details) + """ + if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + if self._index_cache is None: + self._index_cache = pd.RangeIndex(len(self)) + return self._index_cache + else: + return self._coord_df_cache.index + + def _set_index(self, new_index): + """Set the index wrapped by this _IndexMetadata. + + It is important to always set `_index_cache` even if the coord_df is + materialized due to the possibility that it is set to an OID later on. + This design is more straightforward than caching indexes on setting the + coord_df to an OID due to the possibility of an OID-to-OID change. + """ + new_index = pd.DataFrame(index=new_index).index + assert len(new_index) == len(self) + + self._index_cache = new_index + if isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + self._cached_index = True + else: + self._coord_df_cache.index = new_index + + index = property(_get_index, _set_index) + def coords_of(self, key): """Returns the coordinates (partition, index_within_partition) of the provided key in the index. Can be called on its own or implicitly @@ -154,7 +136,7 @@ def coords_of(self, key): Args: key: item to get coordinates of. Can also be a tuple of item - and {partition, index_within_partition} if caller only + and {"partition", "index_within_partition"} if caller only needs one of the coordinates Returns: @@ -180,8 +162,6 @@ def partition_series(self, partition): 'index_within_partition'] def __len__(self): - # Hard to say if this is faster than IndexMetadataBase.__len__ if - # self._coord_df is non-resident return sum(self._lengths) def reset_partition_coords(self, partitions=None): @@ -263,6 +243,12 @@ def insert(self, key, loc=None, partition=None, return coord_to_insert def squeeze(self, partition, index_within_partition): + """Prepare a single coordinate for removal by "squeezing" the + subsequent coordinates "up" one index within that partition. To be used + with "_IndexMetadata.drop" for when all the "squeezed" coordinates are + dropped in batch. Note that this function doesn't actually mutate the + coord_df. + """ self._coord_df = self._coord_df.copy() partition_mask = self._coord_df.partition == partition @@ -272,76 +258,91 @@ def squeeze(self, partition, index_within_partition): 'index_within_partition'] -= 1 def copy(self): - return _IndexMetadata(coord_df_oid=self._coord_df, - lengths_oid=self._lengths) + # TODO: Investigate copy-on-write wrapper for metadata objects + coord_df_copy = self._coord_df_cache + if not isinstance(self._coord_df_cache, ray.local_scheduler.ObjectID): + coord_df_copy = self._coord_df_cache.copy() + lengths_copy = self._lengths_cache + if not isinstance(self._lengths_cache, ray.local_scheduler.ObjectID): + lengths_copy = self._lengths_cache.copy() -class _WrappingIndexMetadata(_IndexMetadata): - """IndexMetadata implementation for index across a non-partitioned axis. - This implementation assumes the underlying index lies across one partition. - """ + index_copy = self._index_cache + if self._index_cache is not None: + index_copy = self._index_cache.copy() - def __init__(self, index): - """Inits a IndexMetadata from Pandas Index only. - - Args: - index (pd.Index): Index to wrap. - - Returns: - A IndexMetadata backed by the specified pd.Index. - """ - self._coord_df = pd.DataFrame(index=index) - # Set _lengths as a dummy variable for future-proof method inheritance - self._lengths = [len(index)] + return _IndexMetadata(index=index_copy, + coord_df_oid=coord_df_copy, + lengths_oid=lengths_copy) - def coords_of(self, key): + def __getitem__(self, key): """Returns the coordinates (partition, index_within_partition) of the - provided key in the index + provided key in the index. Essentially just an alias for + `_IndexMetadata.coords_of` that allows for slice passing, since + slices cannot be passed with slice notation other than through + `__getitem__` calls. Args: - key: item to get coordinates of + key: + item to get coordinates of. Can also be a tuple of item + and {"partition", "index_within_partition"} if caller only + needs one of the coordinates Returns: Pandas object with the keys specified. If key is a single object it will be a pd.Series with items `partition` and - `index_within_partition`, and if key is a slice it will be a - pd.DataFrame with said items as columns. + `index_within_partition`, and if key is a slice or if the key is + duplicate it will be a pd.DataFrame with said items as columns. """ - locs = self.index.get_loc(key) - # locs may be a single int, a slice, or a boolean mask. - # Convert here to iterable of integers - loc_idxs = pd.RangeIndex(len(self.index))[locs] - # TODO: Investigate "modify view/copy" warning - ret_obj = self._coord_df.loc[key] - ret_obj['partition'] = 0 - ret_obj['index_within_partition'] = loc_idxs - return ret_obj + return self.coords_of(key) - def groupby(self, by=None, axis=0, level=None, as_index=True, sort=True, - group_keys=True, squeeze=False, **kwargs): - raise NotImplementedError() + def first_valid_index(self): + return self._coord_df.first_valid_index() - def insert(self, key, loc=None, partition=None, - index_within_partition=None): - """Inserts a key at a certain location in the index, or a certain coord - in a partition. Called with either `loc` or `partition` and - `index_within_partition`. If called with both, `loc` will be used. + def last_valid_index(self): + return self._coord_df.last_valid_index() + + def drop(self, labels, errors='raise'): + """Drop the specified labels from the IndexMetadata Args: - key: item to insert into index - loc: location to insert into index - partition: partition to insert into - index_within_partition: index within partition to insert into + labels (scalar or list-like): + The labels to drop + errors ('raise' or 'ignore'): + If 'ignore', suppress errors for when labels don't exist Returns: - DataFrame with coordinates of insert + DataFrame with coordinates of dropped labels """ - # Generate new index - new_index = self.index.insert(loc, key) + dropped = self.coords_of(labels) - # Make new empty coord_df - self._coord_df = pd.DataFrame(index=new_index) + # Update first lengths to prevent possible length inconsistencies + if isinstance(dropped, pd.DataFrame): + drop_per_part = dropped.groupby(["partition"]).size()\ + .reindex(index=pd.RangeIndex(len(self._lengths)), + fill_value=0) + elif isinstance(dropped, pd.Series): + drop_per_part = np.zeros_like(self._lengths) + drop_per_part[dropped["partition"]] = 1 + else: + raise AssertionError("Unrecognized result from `coords_of`") + self._lengths = self._lengths - drop_per_part + + self._coord_df = self._coord_df.drop(labels, errors=errors) + return dropped - # Shouldn't really need this, but here to maintain API consistency - return pd.DataFrame({'partition': 0, 'index_within_partition': loc}, - index=[key]) + def rename_index(self, mapper): + """Rename the index. + + Args: + mapper: name to rename the index as + """ + self._coord_df = self._coord_df.rename_axis(mapper, axis=0) + + def convert_to_index_sliceable(self, key): + """Converts and performs error checking on the passed slice + + Args: + key: slice to convert and check + """ + return convert_to_index_sliceable(self._coord_df, key) diff --git a/python/ray/dataframe/utils.py b/python/ray/dataframe/utils.py index 97c166d09413..70f97fe5df80 100644 --- a/python/ray/dataframe/utils.py +++ b/python/ray/dataframe/utils.py @@ -112,6 +112,7 @@ def to_pandas(df): else: pd_df = pd.concat(ray.get(df._col_partitions), axis=1) + print(df.columns) pd_df.index = df.index pd_df.columns = df.columns return pd_df @@ -157,34 +158,32 @@ def _map_partitions(func, partitions, *argslists): for part, args in zip(partitions, *argslists)] -@ray.remote(num_return_vals=2) -def _build_columns(df_col, columns): - """Build columns and compute lengths for each partition.""" - # Columns and width +@ray.remote +def _build_col_widths(df_col): + """Compute widths (# of columns) for each partition.""" widths = np.array(ray.get([_deploy_func.remote(_get_widths, d) for d in df_col])) - dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(widths)) - for p_sub_idx in range(widths[p_idx])] - - col_names = ("partition", "index_within_partition") - column_df = pd.DataFrame(dest_indices, index=columns, columns=col_names) - return widths, column_df + return widths -@ray.remote(num_return_vals=2) -def _build_index(df_row, index): - """Build index and compute lengths for each partition.""" - # Rows and length +@ray.remote +def _build_row_lengths(df_row): + """Compute lengths (# of rows) for each partition.""" lengths = np.array(ray.get([_deploy_func.remote(_get_lengths, d) for d in df_row])) - dest_indices = [(p_idx, p_sub_idx) for p_idx in range(len(lengths)) - for p_sub_idx in range(lengths[p_idx])] - col_names = ("partition", "index_within_partition") - index_df = pd.DataFrame(dest_indices, index=index, columns=col_names) + return lengths - return lengths, index_df + +@ray.remote +def _build_coord_df(lengths, index): + """Build the coordinate dataframe over all partitions.""" + coords = np.vstack([np.column_stack((np.full(l, i), np.arange(l))) + for i, l in enumerate(lengths)]) + + col_names = ("partition", "index_within_partition") + return pd.DataFrame(coords, index=index, columns=col_names) def _create_block_partitions(partitions, axis=0, length=None):