diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 6adebf7da08..b3cd03015c8 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -7,7 +7,7 @@ from pandas.compat import string_types from pandas.core.dtypes.cast import find_common_type -from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like, is_numeric_dtype) +from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like, is_numeric_dtype, is_datetime_or_timedelta_dtype) from pandas.core.index import _ensure_index from .partitioning.partition_collections import BlockPartitions @@ -36,12 +36,12 @@ def __constructor__(self, block_paritions_object, index, columns, dtypes=None): def _get_dtype(self): if self._dtype_cache is None: - map_func = lambda df: df.dtypes - def func(row): - return find_common_type(row.values) + map_func = self._prepare_method(lambda df: df.dtypes) + def dtype_builder(df): + return df.apply(lambda row: find_common_type(row.values), axis=0) - self._dtype_cache = self.data.full_reduce(map_func, lambda df: df.apply(func, axis=0), 0) + self._dtype_cache = self.data.full_reduce(map_func, dtype_builder, 0) self._dtype_cache.index = self.columns return self._dtype_cache @@ -139,20 +139,48 @@ def helper(df, internal_indices=[]): return pandas_func(df, **kwargs) return helper - def numeric_indices(self): - """Returns the numeric indices - + def numeric_columns(self): + """Returns the numeric columns of the Manager. + Args: axis: The axis to extract the indices from. Returns: - List of index names + List of index names. """ columns = list() for col, dtype in zip(self.columns, self.dtypes): if is_numeric_dtype(dtype): columns.append(col) return columns + + def numeric_function_clean_dataframe(self, axis): + """Preprocesses numeric functions to clean dataframe and pick numeric indices. + + Args: + axis: '0' if columns and '1' if rows. + + Returns: + Tuple with return value(if any), indices to apply func to & cleaned Manager. + """ + result = None + index = self.numeric_columns() + data_manager = self + + # If no numeric columns and over columns, then return empty Series + if not axis and len(index) == 0: + result = pandas.Series(dtype=np.float64) + + if axis: + nonnumeric = [col for col, dtype in zip(self.columns, self.dtypes) if not is_numeric_dtype(dtype)] + if len(nonnumeric) == len(self.columns): + # If over rows and no numeric columns, return this + result = pandas.Series([np.NaN for _ in self.index]) + else: + data_manager = self.drop(columns=nonnumeric) + index = data_manager.index + + return (result, index, data_manager) # END Internal methods # Metadata modification methods @@ -560,14 +588,24 @@ def transpose(self, *args, **kwargs): # we will implement a Distributed Series, and this will be returned # instead. def full_reduce(self, axis, map_func, reduce_func=None, numeric_only=False): + """Apply function that will reduce the data to a Pandas Series. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + map_func: Callable function to map the dataframe. + reduce_func: Callable function to reduce the dataframe. If none, + then apply map_func twice. + numeric_only: Apply only over the numeric rows. + + Return: + Returns Pandas Series containing the results from map_func and reduce_func. + """ if numeric_only: - index = self.numeric_indices() - if len(index) == 0: - return pandas.Series(dtype=np.float64) - nonnumeric = [col for col, dtype in zip(self.columns, self.dtypes) if not is_numeric_dtype(dtype)] - if axis: - return self.drop(columns=nonnumeric).full_reduce(axis, map_func) + result, index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result else: + data_manager = self if not axis: index = self.columns else: @@ -578,11 +616,20 @@ def full_reduce(self, axis, map_func, reduce_func=None, numeric_only=False): # The XOR here will ensure that we reduce over the correct axis that # exists on the internal partitions. We flip the axis - result = self.data.full_reduce(map_func, reduce_func, axis ^ self._is_transposed) + result = data_manager.data.full_reduce(map_func, reduce_func, axis ^ self._is_transposed) result.index = index return result def count(self, **kwargs): + """Counts the number of non-NaN objects for each column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series containing counts of non-NaN objects from each column or row. + """ axis = kwargs.get("axis", 0) numeric_only = kwargs.get("numeric_only", False) map_func = self._prepare_method(pandas.DataFrame.count, **kwargs) @@ -590,6 +637,15 @@ def count(self, **kwargs): return self.full_reduce(axis, map_func, reduce_func, numeric_only) def max(self, **kwargs): + """Returns the maximum value for each column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series with the maximum values from each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) @@ -597,11 +653,17 @@ def max(self, **kwargs): return self.full_reduce(axis, func, numeric_only=numeric_only) def mean(self, **kwargs): + """Returns the mean for each numerical column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series containing the mean from each numerical column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) def mean_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.mean(df, **kwargs) @@ -610,6 +672,15 @@ def mean_builder(df, internal_indices=[], **kwargs): return self.full_reduce(axis, func, numeric_only=True) def min(self, **kwargs): + """Returns the minimum from each column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series with the minimum value from each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) @@ -617,6 +688,15 @@ def min(self, **kwargs): return self.full_reduce(axis, func, numeric_only=numeric_only) def prod(self, **kwargs): + """Returns the product of each numerical column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0. + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series with the product of each numerical column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) index = self.index if axis else self.columns @@ -624,6 +704,15 @@ def prod(self, **kwargs): return self.full_reduce(axis, func, numeric_only=True) def sum(self, **kwargs): + """Returns the sum of each numerical column or row. + + Args: + axis: 0 for columns and 1 for rows. Default is 0 + numeric_only: If true, only considers numerical columns. + + Return: + Pandas series with the sum of each numerical column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) @@ -680,6 +769,59 @@ def round(self, **kwargs): return self.map_partitions(func, new_dtypes=self._dtype_cache) # END Map partitions operations + # Map partitions across select indices + def astype(self, col_dtypes, errors='raise', **kwargs): + """Converts columns dtypes to given dtypes. + + Args: + col_dtypes: Dictionary of {col: dtype,...} where col is the column + name and dtype is a numpy dtype. + errors: Controlling the raising of errors. + + Returns: + DataFrame with updated dtypes. + """ + # Group indices to update by dtype for less map operations + dtype_indices = dict() + columns = col_dtypes.keys() + numeric_indices = list(self.columns.get_indexer_for(columns)) + + # Create Series for the updated dtypes + new_dtypes = self.dtypes.copy() + + for i, column in enumerate(columns): + + dtype = col_dtypes[column] + if dtype != self.dtypes[column]: + # Only add dtype only if different + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) + else: + dtype_indices[dtype] = [numeric_indices[i]] + + # Update the new dtype series to the proper pandas dtype + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.dtype('int64') + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.dtype('float64') + new_dtypes[column] = new_dtype + + # Update partitions for each dtype that is updated + new_data = self.data + for dtype in dtype_indices.keys(): + + def astype(df, internal_indices=[]): + block_dtypes = dict() + for ind in internal_indices: + block_dtypes[df.columns[ind]] = dtype + return df.astype(block_dtypes) + + new_data = new_data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) + + return self.__constructor__(new_data, self.index, self.columns, new_dtypes) + # END Map partitions across select indices + # Column/Row partitions reduce operations # # These operations result in a reduced dimensionality of data. @@ -687,6 +829,14 @@ def round(self, **kwargs): # we will implement a Distributed Series, and this will be returned # instead. def full_axis_reduce(self, func, axis): + """Applies map that reduce Manager to series but require knowledge of full axis. + + Args: + func: Function to reduce the Manager by. This function takes in a Manager. + + Return: + Pandas series containing the reduced data. + """ result = self.data.map_across_full_axis(axis, func).to_pandas(self._is_transposed) if not axis: @@ -697,17 +847,37 @@ def full_axis_reduce(self, func, axis): return result def all(self, **kwargs): + """Returns whether all the elements are true, potentially over an axis. + + Args: + axis: 0 for columns and 1 for rows. Defaults to 0. + + Return: + Pandas Series containing boolean values. + """ axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.all, **kwargs) return self.full_axis_reduce(func, axis) def any(self, **kwargs): + """Returns whether any element is true over the requested axis. + + Args: + axis: 0 for columns and 1 for rows. Defaults to 0. + + Return: + Pandas Series containing boolean values. + """ axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.any, **kwargs) return self.full_axis_reduce(func, axis) def first_valid_index(self): + """Returns index of first non-NaN/NULL value. + Return: + Scalar of index name. + """ # It may be possible to incrementally check each partition, but this # computation is fairly cheap. def first_valid_index_builder(df): @@ -722,12 +892,25 @@ def first_valid_index_builder(df): return self.index[first_result.min()] def _post_process_idx_ops(self, axis, intermediate_result): + """Converts internal index to external index. + + Args: + axis: 0 for columns and 1 for rows. Defaults to 0. + intermediate_result: Internal index of self.data. + + Returns: + External index of the intermediate_result. + """ index = self.index if not axis else self.columns result = intermediate_result.apply(lambda x: index[x]) return result def idxmax(self, **kwargs): + """Returns the first occurance of the maximum over requested axis. + Returns: + Series containing the maximum of each column or axis. + """ # The reason for the special treatment with idxmax/min is because we # need to communicate the row number back here. def idxmax_builder(df, **kwargs): @@ -742,7 +925,11 @@ def idxmax_builder(df, **kwargs): return self._post_process_idx_ops(axis, max_result) def idxmin(self, **kwargs): + """Returns the first occurance of the minimum over requested axis. + Returns: + Series containing the minimum of each column or axis. + """ # The reason for the special treatment with idxmax/min is because we # need to communicate the row number back here. def idxmin_builder(df, **kwargs): @@ -757,7 +944,11 @@ def idxmin_builder(df, **kwargs): return self._post_process_idx_ops(axis, min_result) def last_valid_index(self): + """Returns index of last non-NaN/NULL value. + Return: + Scalar of index name. + """ def last_valid_index_builder(df): df.index = pandas.RangeIndex(len(df.index)) return df.apply(lambda df: df.last_valid_index()) @@ -770,6 +961,11 @@ def last_valid_index_builder(df): return self.index[first_result.max()] def memory_usage(self, **kwargs): + """Returns the memory usage of each column. + + Returns: + Series containing the memory usage of each column. + """ def memory_usage_builder(df, **kwargs): return df.memory_usage(index=False, deep=deep) @@ -778,11 +974,21 @@ def memory_usage_builder(df, **kwargs): return self.full_axis_reduce(func, 0) def nunique(self, **kwargs): + """Returns the number of unique items over each column or row. + + Returns: + Series of ints indexed by column or index names. + """ axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) return self.full_axis_reduce(func, axis) def to_datetime(self, **kwargs): + """Converts the Manager to a Series of DateTime objects. + + Returns: + Series of DateTime objects. + """ columns = self.columns def to_datetime_builder(df, **kwargs): df.columns = columns @@ -798,6 +1004,17 @@ def to_datetime_builder(df, **kwargs): # we will implement a Distributed Series, and this will be returned # instead. def full_axis_reduce_along_select_indices(self, func, axis, index, pandas_result=True): + """Reduce Manger along select indices using function that needs full axis. + + Args: + func: Callable that reduces Manager to Series using full knowledge of an axis. + axis: 0 for columns and 1 for rows. Defaults to 0. + index: Index of the resulting series. + pandas_result: Return the result as a Pandas Series instead of raw data. + + Returns: + Either a Pandas Series with index or BlockPartitions object. + """ # Convert indices to numeric indices old_index = self.index if axis else self.columns numeric_indices = [i for i, name in enumerate(old_index) if name in index] @@ -813,10 +1030,16 @@ def full_axis_reduce_along_select_indices(self, func, axis, index, pandas_result return result def describe(self, **kwargs): + """Generates descriptive statistics. + Returns: + DataFrame object containing the descriptive statistics of the DataFrame. + """ axis = 0 - new_index = self.numeric_indices() + # Only describe numeric if there are numeric + # Otherwise, describe all + new_index = self.numeric_columns() if len(new_index) != 0: numeric = True else: @@ -827,6 +1050,7 @@ def describe(self, **kwargs): def describe_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.describe(df, **kwargs) + # Apply describe and update indices, columns, and dtypes func = self._prepare_method(describe_builder, **kwargs) new_data = self.full_axis_reduce_along_select_indices(func, 0, new_index, False) new_index = self.compute_index(0, new_data, False) @@ -839,75 +1063,105 @@ def describe_builder(df, internal_indices=[], **kwargs): return self.__constructor__(new_data, new_index, new_columns, new_dtypes) def median(self, **kwargs): + """Returns median of each column or row. + + Returns: + Series containing the median of each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) + result, index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result def median_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.median(df, **kwargs) func = self._prepare_method(median_builder, **kwargs) - return self.full_axis_reduce_along_select_indices(func, axis, new_index) + return data_manager.full_axis_reduce_along_select_indices(func, axis, index) def skew(self, **kwargs): + """Returns skew of each column or row. + + Returns: + Series containing the skew of each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) + result, index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result def skew_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.skew(df, **kwargs) func = self._prepare_method(skew_builder, internal_indices=[], **kwargs) - return self.full_axis_reduce_along_select_indices(func, axis, new_index) + return data_manager.full_axis_reduce_along_select_indices(func, axis, index) def std(self, **kwargs): + """Returns standard deviation of each column or row. + + Returns: + Series containing the standard deviation of each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) + result, index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result def std_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.std(df, **kwargs) func = self._prepare_method(std_builder, **kwargs) - return self.full_axis_reduce_along_select_indices(func, axis, new_index) + return data_manager.full_axis_reduce_along_select_indices(func, axis, index) def var(self, **kwargs): + """Returns varience of each column or row. + + Returns: + Series containing the varience of each column or row. + """ # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) + result, index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result def var_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.var(df, **kwargs) - func = self._prepare_method(var_builder, **kwargs) - return self.full_axis_reduce_along_select_indices(func, axis, new_index) + func = data_manager._prepare_method(var_builder, **kwargs) + return data_manager.full_axis_reduce_along_select_indices(func, axis, index) def quantile_for_single_value(self, **kwargs): + """Returns quantile of each column or row. + + Returns: + Series containing the quantile of each column or row. + """ axis = kwargs.get("axis", 0) q = kwargs.get("q", 0.5) + numeric_only = kwargs.get("numeric_only", True) assert type(q) is float - new_index = self.numeric_indices() - if len(new_index) == 0: - return pandas.Series(dtype=np.float64) + if numeric_only: + result, new_index, data_manager = self.numeric_function_clean_dataframe(axis) + if result is not None: + return result + else: + new_index = [col for col, dtype in zip(self.columns, self.dtypes) if (is_numeric_dtype(dtype) or is_datetime_or_timedelta_dtype(dtype))] + data_manager = self def quantile_builder(df, internal_indices=[], **kwargs): return pandas.DataFrame.quantile(df, **kwargs) func = self._prepare_method(quantile_builder, **kwargs) - result = self.full_axis_reduce_along_select_indices(func, axis, new_index) + result = data_manager.full_axis_reduce_along_select_indices(func, axis, new_index) result.name = q return result # END Column/Row partitions reduce operations over select indices @@ -1070,27 +1324,15 @@ def fillna_dict_builder(df, func_dict={}): new_data = self.map_across_full_axis(axis, func) return self.__constructor__(new_data, self.index, self.columns) - def quantile_for_list_of_values(self, **kwargs): - - axis = kwargs.get("axis", 0) - q = kwargs.get("q", 0.5) - assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) - - index = self.index if axis else self.columns - new_columns = list() - for i, dtype in enumerate(self.dtypes): - if is_numeric_dtype(dtype): - new_columns.append(index[i]) - - func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) - - q_index = pandas.Float64Index(q) - - new_data = self.map_across_full_axis(axis, func) - return self.__constructor__(new_data, q_index, new_columns) - def query(self, expr, **kwargs): + """Query columns of the DataManager with a boolean expression. + Args: + expr: Boolean expression to query the columns with. + + Returns: + DataManager containing the rows where the boolean expression is satisfied. + """ columns = self.columns def query_builder(df, **kwargs): @@ -1111,8 +1353,11 @@ def query_builder(df, **kwargs): return self.__constructor__(new_data, new_index, self.columns, self.dtypes) def rank(self, **kwargs): + """Computes numerical rank along axis. Equal values are set to the average. - + Returns: + DataManager containing the ranks of the values along an axis. + """ axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) @@ -1129,52 +1374,73 @@ def rank(self, **kwargs): return self.__constructor__(new_data, self.index, new_columns, new_dtypes) # END Map across rows/columns - # Map across select rows/columns + # Map across rows/columns # These operations require some global knowledge of the full column/row # that is being operated on. This means that we have to put all of that # data in the same place. - def astype(self, col_dtypes, errors='raise', **kwargs): + def map_across_full_axis_select_indices(self, axis, func, indices, keep_remaining=False): + """Maps function to select indices along full axis. + Args: + axis: 0 for columns and 1 for rows. + func: Callable mapping function over the BlockParitions. + indices: indices along axis to map over. + keep_remaining: True if keep indices where function was not applied. - # Group the indicies to update together and create new dtypes series - dtype_indices = dict() - columns = col_dtypes.keys() - new_dtypes = self.dtypes.copy() - - numeric_indices = list(self.columns.get_indexer_for(columns)) + Returns: + BlockPartitions containing the result of mapping func over axis on indices. + """ + return self.data.apply_func_to_select_indices_along_full_axis(axis, func, indices, keep_remaining) + - for i, column in enumerate(columns): - dtype = col_dtypes[column] - if dtype != self.dtypes[column]: - if dtype in dtype_indices.keys(): - dtype_indices[dtype].append(numeric_indices[i]) - else: - dtype_indices[dtype] = [numeric_indices[i]] - new_dtype = np.dtype(dtype) - if dtype != np.int32 and new_dtype == np.int32: - new_dtype = np.dtype('int64') - elif dtype != np.float32 and new_dtype == np.float32: - new_dtype = np.dtype('float64') - new_dtypes[column] = new_dtype + def quantile_for_list_of_values(self, **kwargs): + """Returns Manager containing quantiles along an axis for numeric columns. - new_data = self.data - for dtype in dtype_indices.keys(): - resulting_dtype = None + Returns: + DataManager containing quantiles of original DataManager along an axis. + """ + axis = kwargs.get("axis", 0) + q = kwargs.get("q") + numeric_only = kwargs.get("numeric_only", True) + assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) - def astype(df, internal_indices=[]): - block_dtypes = dict() - for ind in internal_indices: - block_dtypes[df.columns[ind]] = dtype - return df.astype(block_dtypes) + if numeric_only: + new_columns = self.numeric_columns() + else: + new_columns = [col for col, dtype in zip(self.columns, self.dtypes) if (is_numeric_dtype(dtype) or is_datetime_or_timedelta_dtype(dtype))] + index = self.index if axis else self.columns + if axis: + # If along rows, then drop the nonnumeric columns, record the index, and take transpose. + # We have to do this because if we don't, the result is all in one column for some reason. + nonnumeric = [col for col, dtype in zip(self.columns, self.dtypes) if not is_numeric_dtype(dtype)] + data_manager = self.drop(columns=nonnumeric) + new_columns = data_manager.index + numeric_indices = list(data_manager.index.get_indexer_for(new_columns)) + data_manager = data_manager.transpose() + kwargs.pop("axis") + else: + data_manager = self + numeric_indices = list(self.columns.get_indexer_for(new_columns)) - new_data = new_data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) + def quantile_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.quantile(df, **kwargs) - return self.__constructor__(new_data, self.index, self.columns, new_dtypes) + func = self._prepare_method(quantile_builder, **kwargs) + q_index = pandas.Float64Index(q) + new_data = data_manager.map_across_full_axis_select_indices(0, func, numeric_indices) + return self.__constructor__(new_data, q_index, new_columns) # END Map across rows/columns # Head/Tail/Front/Back def head(self, n): + """Returns the first n rows. + Args: + n: Integer containing the number of rows to return. + + Returns: + DataManager containing the first n rows of the original DataManager. + """ # We grab the front if it is transposed and flag as transposed so that # we are not physically updating the data from this manager. This # allows the implementation to stay modular and reduces data copying. @@ -1190,7 +1456,14 @@ def head(self, n): return result def tail(self, n): + """Returns the last n rows. + + Args: + n: Integer containing the number of rows to return. + Returns: + DataManager containing the last n rows of the original DataManager. + """ # See head for an explanation of the transposed behavior if self._is_transposed: result = self.__constructor__(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self._dtype_cache) @@ -1201,7 +1474,14 @@ def tail(self, n): return result def front(self, n): + """Returns the first n columns. + + Args: + n: Integer containing the number of columns to return. + Returns: + DataManager containing the first n columns of the original DataManager. + """ # See head for an explanation of the transposed behavior if self._is_transposed: result = self.__constructor__(self.data.transpose().take(0, n).transpose(), self.index, self.columns[:n], self.dtypes[:n]) @@ -1211,7 +1491,14 @@ def front(self, n): return result def back(self, n): + """Returns the last n columns. + + Args: + n: Integer containing the number of columns to return. + Returns: + DataManager containing the last n columns of the original DataManager. + """ # See head for an explanation of the transposed behavior if self._is_transposed: result = self.__constructor__(self.data.transpose().take(0, -n).transpose(), self.index, self.columns[-n:], self.dtypes[-n:]) @@ -1231,6 +1518,11 @@ def free(self): # To/From Pandas def to_pandas(self): + """Converts Modin DataFrame to Pandas DataFrame. + + Returns: + Pandas DataFrame of the DataManager. + """ df = self.data.to_pandas(is_transposed=self._is_transposed) df.index = self.index df.columns = self.columns @@ -1238,6 +1530,16 @@ def to_pandas(self): @classmethod def from_pandas(cls, df, block_partitions_cls): + """Improve simple Pandas DataFrame to an advanced and superior Modin DataFrame. + + Args: + cls: DataManger object to convert the DataFrame to. + df: Pandas DataFrame object. + block_partitions_cls: BlockParitions object to store partitions + + Returns: + Returns DataManager containing data from the Pandas DataFrame. + """ new_index = df.index new_columns = df.columns new_dtypes = df.dtypes diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index c9acf920c91..bd13dbd5dcb 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -8,7 +8,7 @@ import pandas.core.common as com from pandas.core.dtypes.common import (_get_dtype_from_object, is_bool_dtype, is_list_like, is_numeric_dtype, - is_timedelta64_dtype) + is_datetime_or_timedelta_dtype, is_dtype_equal) from pandas.core.index import _ensure_index_from_sequences from pandas.core.indexing import (check_bool_indexer, convert_to_index_sliceable) @@ -631,7 +631,7 @@ def align(self, "To contribute to Pandas on Ray, please visit " "github.com/modin-project/modin.") - def all(self, axis=None, bool_only=None, skipna=None, level=None, + def all(self, axis=0, bool_only=None, skipna=None, level=None, **kwargs): """Return whether all elements are True over requested axis @@ -639,15 +639,21 @@ def all(self, axis=None, bool_only=None, skipna=None, level=None, If axis=None or axis=0, this call applies df.all(axis=1) to the transpose of df. """ - axis = pandas.DataFrame()._get_axis_number( - axis) if axis is not None else 0 + if axis is not None: + axis = pandas.DataFrame()._get_axis_number(axis) + else: + axis = None - return self._data_manager.all( + result = self._data_manager.all( axis=axis, bool_only=bool_only, skipna=skipna, level=level, **kwargs) + if axis is not None: + return result + else: + return result.all() def any(self, axis=None, bool_only=None, skipna=None, level=None, **kwargs): @@ -2611,26 +2617,27 @@ def quantile(self, are the quantiles. """ - def check_bad_dtype(t): - return t == np.dtype('O') or is_timedelta64_dtype(t) + def check_dtype(t): + return (is_numeric_dtype(t) or is_datetime_or_timedelta_dtype(t)) if not numeric_only: - # check if there are any object columns - if all(check_bad_dtype(t) for t in self.dtypes): + # If not numeric_only and columns, then check all columns are either numeric, timestamp, or timedelta + if not axis and not all(check_dtype(t) for t in self.dtypes): raise TypeError("can't multiply sequence by non-int of type " "'float'") - else: - if next((True for t in self.dtypes if check_bad_dtype(t)), - False): - dtype = next(t for t in self.dtypes if check_bad_dtype(t)) - raise ValueError( - "Cannot compare type '{}' with type '{}'".format( - type(dtype), float)) + + # If over rows, then make sure that all dtypes are equal for not numeric_only + elif axis: + for i in range(1, len(self.dtypes)): + pre_dtype = self.dtypes[i-1] + curr_dtype = self.dtypes[i] + if not is_dtype_equal(pre_dtype, curr_dtype): + raise TypeError("Cannot compare type '{0}' with type '{1}'".format(pre_dtype, curr_dtype)) else: # Normally pandas returns this near the end of the quantile, but we # can't afford the overhead of running the entire operation before # we error. - if all(check_bad_dtype(t) for t in self.dtypes): + if not any(is_numeric_dtype(t) for t in self.dtypes): raise ValueError("need at least one array to concatenate") # check that all qs are between 0 and 1 @@ -4452,7 +4459,7 @@ def __neg__(self): """ for t in self.dtypes: if not (is_bool_dtype(t) or is_numeric_dtype(t) - or is_timedelta64_dtype(t)): + or is_datetime_or_timedelta_dtype(t)): raise TypeError( "Unary negative expects numeric dtype, not {}".format(t))