diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index e9410b42dbb..27807e2359b 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -5,7 +5,8 @@ import numpy as np import pandas from pandas.compat import string_types -from pandas.core.dtypes.common import is_list_like +from pandas.core.dtypes.cast import find_common_type +from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like) from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions from .partitioning.remote_partition import RayRemotePartition @@ -16,11 +17,32 @@ class PandasDataManager(object): with a Pandas backend. This logic is specific to Pandas. """ - def __init__(self, block_partitions_object, index, columns): + def __init__(self, block_partitions_object, index, columns, dtypes=None): assert isinstance(block_partitions_object, BlockPartitions) self.data = block_partitions_object self.index = index self.columns = columns + if dtypes is not None: + self._dtype_cache = dtypes + + # dtypes + _dtype_cache = None + + def _get_dtype(self): + if self._dtype_cache is None: + map_func = lambda df: df.dtypes + + def func(row): + return find_common_type(row.values) + + self._dtype_cache = self.data.full_reduce(map_func, lambda df: df.apply(func, axis=0), 0) + self._dtype_cache.index = self.columns + return self._dtype_cache + + def _set_dtype(self, dtypes): + self._dtype_cache = dtypes + + dtypes = property(_get_dtype, _set_dtype) # Index and columns objects # These objects are currently not distributed. @@ -53,7 +75,7 @@ def _set_columns(self, new_columns): columns = property(_get_columns, _set_columns) index = property(_get_index, _set_index) - # END Index and columns objects + # END Index, columns, and dtypes objects def compute_index(self, axis, data_object, compute_diff=True): """Computes the index after a number of rows have been removed. @@ -99,12 +121,12 @@ def _prepare_method(self, pandas_func, **kwargs): def add_prefix(self, prefix): cls = type(self) new_column_names = self.columns.map(lambda x: str(prefix) + str(x)) - return cls(self.data, self.index, new_column_names) + return cls(self.data, self.index, new_column_names, self.dtypes) def add_suffix(self, suffix): cls = type(self) new_column_names = self.columns.map(lambda x: str(x) + str(suffix)) - return cls(self.data, self.index, new_column_names) + return cls(self.data, self.index, new_column_names, self.dtypes) # END Metadata modification methods # Copy @@ -113,7 +135,7 @@ def add_suffix(self, suffix): # to prevent that. def copy(self): cls = type(self) - return cls(self.data.copy(), self.index.copy(), self.columns.copy()) + return cls(self.data.copy(), self.index.copy(), self.columns.copy(), self.dtypes.copy()) # Append/Concat/Join (Not Merge) # The append/concat/join operations should ideally never trigger remote @@ -446,7 +468,8 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs): # Additionally this operation is often followed by an operation that # assumes identical partitioning. Internally, we *may* change the # partitioning during a map across a full axis. - return cls(self.map_across_full_axis(axis, func), new_index, new_columns) + new_data = self.map_across_full_axis(axis, func) + return cls(new_data, new_index, new_columns) def reset_index(self, **kwargs): cls = type(self) @@ -461,7 +484,7 @@ def reset_index(self, **kwargs): else: # The copies here are to ensure that we do not give references to # this object for the purposes of updates. - return cls(self.data.copy(), new_index, self.columns.copy()) + return cls(self.data.copy(), new_index, self.columns.copy(), self.dtypes.copy()) # END Reindex/reset_index # Transpose @@ -548,13 +571,14 @@ def sum(self, **kwargs): # Map partitions operations # These operations are operations that apply a function to every partition. - def map_partitions(self, func): + def map_partitions(self, func, new_dtypes=None): cls = type(self) - return cls(self.data.map_across_blocks(func), self.index, self.columns) + return cls(self.data.map_across_blocks(func), self.index, self.columns, new_dtypes) def abs(self): func = self._prepare_method(pandas.DataFrame.abs) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('float64') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def applymap(self, func): remote_func = self._prepare_method(pandas.DataFrame.applymap, func=func) @@ -562,15 +586,18 @@ def applymap(self, func): def isin(self, **kwargs): func = self._prepare_method(pandas.DataFrame.isin, **kwargs) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def isna(self): func = self._prepare_method(pandas.DataFrame.isna) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def isnull(self): func = self._prepare_method(pandas.DataFrame.isnull) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def negative(self, **kwargs): func = self._prepare_method(pandas.DataFrame.__neg__, **kwargs) @@ -578,15 +605,17 @@ def negative(self, **kwargs): def notna(self): func = self._prepare_method(pandas.DataFrame.notna) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def notnull(self): func = self._prepare_method(pandas.DataFrame.notnull) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def round(self, **kwargs): func = self._prepare_method(pandas.DataFrame.round, **kwargs) - return self.map_partitions(func) + return self.map_partitions(func, new_dtypes=self.dtypes.copy()) # END Map partitions operations # Column/Row partitions reduce operations @@ -773,7 +802,7 @@ def query_builder(df, **kwargs): # Query removes rows, so we need to update the index new_index = self.compute_index(0, new_data, True) - return cls(new_data, new_index, self.columns) + return cls(new_data, new_index, self.columns, self.dtypes) def eval(self, expr, **kwargs): cls = type(self) @@ -821,7 +850,7 @@ def _cumulative_builder(self, func, **kwargs): axis = kwargs.get("axis", 0) func = self._prepare_method(func, **kwargs) new_data = self.map_across_full_axis(axis, func) - return cls(new_data, self.index, self.columns) + return cls(new_data, self.index, self.columns, self.dtypes) def cumsum(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumsum, **kwargs) @@ -895,7 +924,7 @@ def mode(self, **kwargs): # We build these intermediate objects to avoid depending directly on # the underlying implementation. final_data = cls(new_data, new_index, new_columns).map_across_full_axis(axis, lambda df: df.reindex(axis=axis, labels=final_labels)) - return cls(final_data, new_index, new_columns) + return cls(final_data, new_index, new_columns, self.dtypes) def fillna(self, **kwargs): cls = type(self) @@ -945,8 +974,8 @@ def rank(self, **kwargs): new_columns = self.compute_index(1, new_data, True) else: new_columns = self.columns - - return cls(new_data, self.index, new_columns) + new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) + return cls(new_data, self.index, new_columns, new_dtypes) def diff(self, **kwargs): cls = type(self) @@ -970,40 +999,40 @@ def head(self, n): # ensure that we extract the correct data on each node. The index # on a transposed manager is already set to the correct value, so # we need to only take the head of that instead of re-transposing. - result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns) + result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self.dtypes) result._is_transposed = True else: - result = cls(self.data.take(0, n), self.index[:n], self.columns) + result = cls(self.data.take(0, n), self.index[:n], self.columns, self.dtypes) return result def tail(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns) + result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self.dtypes) result._is_transposed = True else: - result = cls(self.data.take(0, -n), self.index[-n:], self.columns) + result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self.dtypes) return result def front(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(0, n).transpose(), self.index, self.columns[:n]) + result = cls(self.data.transpose().take(0, n).transpose(), self.index, self.columns[:n], self.dtypes[:n]) result._is_transposed = True else: - result = cls(self.data.take(1, n), self.index, self.columns[:n]) + result = cls(self.data.take(1, n), self.index, self.columns[:n], self.dtypes[:n]) return result def back(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(0, -n).transpose(), self.index, self.columns[-n:]) + result = cls(self.data.transpose().take(0, -n).transpose(), self.index, self.columns[-n:], self.dtypes[-n:]) result._is_transposed = True else: - result = cls(self.data.take(1, -n), self.index, self.columns[-n:]) + result = cls(self.data.take(1, -n), self.index, self.columns[-n:], self.dtypes[-n:]) return result # End Head/Tail/Front/Back @@ -1026,13 +1055,14 @@ def to_pandas(self): def from_pandas(cls, df, block_partitions_cls): new_index = df.index new_columns = df.columns + new_dtypes = df.dtypes # Set the columns to RangeIndex for memory efficiency df.index = pandas.RangeIndex(len(df.index)) df.columns = pandas.RangeIndex(len(df.columns)) new_data = block_partitions_cls.from_pandas(df) - return cls(new_data, new_index, new_columns) + return cls(new_data, new_index, new_columns, new_dtypes) # __getitem__ methods def getitem_single_key(self, key): @@ -1062,7 +1092,8 @@ def getitem(df, internal_indices=[]): # We can't just set the columns to key here because there may be # multiple instances of a key. new_columns = self.columns[numeric_indices] - return cls(result, self.index, new_columns) + new_dtypes = self.dtypes[numeric_indices] + return cls(result, self.index, new_columns, new_dtypes) def getitem_row_array(self, key): cls = type(self) @@ -1076,7 +1107,7 @@ def getitem(df, internal_indices=[]): # We can't just set the index to key here because there may be multiple # instances of a key. new_index = self.index[numeric_indices] - return cls(result, new_index, self.columns) + return cls(result, new_index, self.columns, self.dtypes) # END __getitem__ methods # __delitem__ and drop @@ -1102,6 +1133,7 @@ def delitem(df, internal_indices=[]): if columns is None: new_columns = self.columns + new_dtypes = self.dtypes else: def delitem(df, internal_indices=[]): return df.drop(columns=df.columns[internal_indices]) @@ -1111,7 +1143,9 @@ def delitem(df, internal_indices=[]): # We can't use self.columns.drop with duplicate keys because in Pandas # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] - return cls(new_data, new_index, new_columns) + new_dtypes = self.dtypes.drop(columns) + + return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop # Insert @@ -1129,9 +1163,54 @@ def insert(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices_along_full_axis(0, insert, loc, keep_remaining=True) new_columns = self.columns.insert(loc, column) - return cls(new_data, self.index, new_columns) + + # Because a Pandas Series does not allow insert, we make a DataFrame + # and insert the new dtype that way. + temp_dtypes = pandas.DataFrame(self.dtypes).T + temp_dtypes.insert(loc, column, _get_dtype_from_object(value)) + new_dtypes = temp_dtypes.iloc[0] + + return cls(new_data, self.index, new_columns, new_dtypes) # END Insert + # astype + # This method changes the types of select columns to the new dtype. + def astype(self, col_dtypes, errors='raise', **kwargs): + cls = type(self) + + # Group the indicies to update together and create new dtypes series + dtype_indices = dict() + columns = col_dtypes.keys() + new_dtypes = self.dtypes.copy() + + numeric_indices = list(self.columns.get_indexer_for(columns)) + + for i, column in enumerate(columns): + dtype = col_dtypes[column] + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) + else: + dtype_indices[dtype] = [numeric_indices[i]] + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.dtype('int64') + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.dtype('float64') + new_dtypes[column] = new_dtype + + for dtype in dtype_indices.keys(): + + def astype(df, internal_indices=[]): + block_dtypes = dict() + for ind in internal_indices: + block_dtypes[df.columns[ind]]= dtype + return df.astype(block_dtypes) + + new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) + + return cls(self.data, self.index, self.columns, new_dtypes) + # END astype + # UDF (apply and agg) methods # There is a wide range of behaviors that are supported, so a lot of the # logic can get a bit convoluted. @@ -1161,6 +1240,11 @@ def _post_process_apply(self, result_data, axis): # this logic here. if len(columns) == 0: series_result = result_data.to_pandas(False) + if not axis and len(series_result) == len(self.columns) and len(index) != len(series_result): + index = self.columns + elif axis and len(series_result) == len(self.index) and len(index) != len(series_result): + index = self.index + series_result.index = index return series_result @@ -1220,7 +1304,7 @@ def callable_apply_builder(df, func, axis, index, *args, **kwargs): result_data = self.map_across_full_axis(axis, func_prepared) return self._post_process_apply(result_data, axis) - #END UDF + # END UDF # Manual Partitioning methods (e.g. merge, groupby) # These methods require some sort of manual partitioning due to their diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 61d6d5fdf8e..ba381217c2d 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -47,7 +47,6 @@ def __init__(self, block_partitions=None, row_metadata=None, col_metadata=None, - dtypes_cache=None, data_manager=None): """Distributed DataFrame object backed by Pandas dataframes. @@ -77,8 +76,6 @@ def __init__(self, self._data_manager = data._data_manager return - self._dtypes_cache = dtypes_cache - # Check type of data and use appropriate constructor if data is not None or (col_partitions is None and row_partitions is None @@ -92,9 +89,6 @@ def __init__(self, dtype=dtype, copy=copy) - # Cache dtypes - self._dtypes_cache = pandas_df.dtypes - self._data_manager = from_pandas(pandas_df)._data_manager else: if data_manager is not None: @@ -124,11 +118,6 @@ def __init__(self, partitions = col_partitions axis_length = len(index) if index is not None else \ len(row_metadata) - # All partitions will already have correct dtypes - self._dtypes_cache = [ - _deploy_func.remote(lambda df: df.dtypes, pandas_df) - for pandas_df in col_partitions - ] # TODO: write explicit tests for "short and wide" # column partitions @@ -136,9 +125,6 @@ def __init__(self, _create_block_partitions(partitions, axis=axis, length=axis_length) - if self._dtypes_cache is None and data_manager is None: - self._get_remote_dtypes() - if data_manager is None: self._data_manager = RayPandasDataManager._from_old_block_partitions(self._block_partitions, index, columns) @@ -336,19 +322,12 @@ def ftypes(self): """ # The ftypes are common across all partitions. # The first partition will be enough. - result = ray.get( - _deploy_func.remote(lambda df: df.ftypes, self._row_partitions[0])) - result.index = self.columns + dtypes = self.dtypes.copy() + ftypes = ["{0}:dense".format(str(dtype)) + for dtype in dtypes.values] + result = pandas.Series(ftypes, index=self.columns) return result - def _get_remote_dtypes(self): - """Finds and caches ObjectIDs for the dtypes of each column partition. - """ - self._dtypes_cache = [ - _compile_remote_dtypes.remote(*column) - for column in self._block_partitions.T - ] - @property def dtypes(self): """Get the dtypes for this DataFrame. @@ -356,17 +335,7 @@ def dtypes(self): Returns: The dtypes for this DataFrame. """ - return [] - # assert self._dtypes_cache is not None - # - # if isinstance(self._dtypes_cache, list) and \ - # isinstance(self._dtypes_cache[0], - # ray.ObjectID): - # self._dtypes_cache = pandas.concat( - # ray.get(self._dtypes_cache), copy=False) - # self._dtypes_cache.index = self.columns - # - # return self._dtypes_cache + return self._data_manager.dtypes @property def empty(self): @@ -414,7 +383,6 @@ def _update_inplace(self, new_manager): old_manager = self._data_manager self._data_manager = new_manager old_manager.free() - # self._get_remote_dtypes() def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -922,42 +890,23 @@ def assign(self, **kwargs): "github.com/modin-project/modin.") def astype(self, dtype, copy=True, errors='raise', **kwargs): + col_dtypes = dict() if isinstance(dtype, dict): if (not set(dtype.keys()).issubset(set(self.columns)) and errors == 'raise'): raise KeyError("Only a column name can be used for the key in" "a dtype mappings argument.") - columns = list(dtype.keys()) - col_idx = [(self.columns.get_loc(columns[i]), columns[i]) if - columns[i] in self.columns else (columns[i], columns[i]) - for i in range(len(columns))] - new_dict = {} - for idx, key in col_idx: - new_dict[idx] = dtype[key] - new_rows = _map_partitions(lambda df, dt: df.astype(dtype=dt, - copy=True, - errors=errors, - **kwargs), - self._row_partitions, new_dict) - if copy: - return DataFrame( - row_partitions=new_rows, - columns=self.columns, - index=self.index) - self._row_partitions = new_rows + col_dtypes = dtype + + else: + for column in self.columns: + col_dtypes[column] = dtype + + new_data_manager = self._data_manager.astype(col_dtypes, errors, **kwargs) + if copy: + return DataFrame(data_manager=new_data_manager) else: - new_blocks = [_map_partitions(lambda d: d.astype(dtype=dtype, - copy=True, - errors=errors, - **kwargs), - block) - for block in self._block_partitions] - if copy: - return DataFrame( - block_partitions=new_blocks, - columns=self.columns, - index=self.index) - self._block_partitions = new_blocks + self._update_inplace(new_data_manager) def at_time(self, time, asof=False): raise NotImplementedError( @@ -1680,9 +1629,7 @@ def get_dtype_counts(self): Returns: The counts of dtypes in this object. """ - return ray.get( - _deploy_func.remote(lambda df: df.get_dtype_counts(), - self._row_partitions[0])) + return self.dtypes.value_counts() def get_ftype_counts(self): """Get the counts of ftypes in this object. @@ -1690,9 +1637,7 @@ def get_ftype_counts(self): Returns: The counts of ftypes in this object. """ - return ray.get( - _deploy_func.remote(lambda df: df.get_ftype_counts(), - self._row_partitions[0])) + return self.ftypes.value_counts() def get_value(self, index, col, takeable=False): raise NotImplementedError( diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index bffac43a485..d49aad72618 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -69,8 +69,8 @@ def test_int_dataframe(): test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) test_ndim(ray_df, pandas_df) - # test_ftypes(ray_df, pandas_df) - # test_dtypes(ray_df, pandas_df) + test_ftypes(ray_df, pandas_df) + test_dtypes(ray_df, pandas_df) test_values(ray_df, pandas_df) test_axes(ray_df, pandas_df) test_shape(ray_df, pandas_df) @@ -129,8 +129,8 @@ def test_int_dataframe(): for key in keys: test_get(ray_df, pandas_df, key) - # test_get_dtype_counts(ray_df, pandas_df) - # test_get_ftype_counts(ray_df, pandas_df) + test_get_dtype_counts(ray_df, pandas_df) + test_get_ftype_counts(ray_df, pandas_df) # test_iterrows(ray_df, pandas_df) # test_items(ray_df, pandas_df) # test_iteritems(ray_df, pandas_df) @@ -1053,6 +1053,7 @@ def test_append(): @pytest.fixture def test_apply(ray_df, pandas_df, func, axis): + print(func) ray_result = ray_df.apply(func, axis) pandas_result = pandas_df.apply(func, axis) if isinstance(ray_result, pd.DataFrame):