diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 3bb293d6841..b576b17ff89 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -7,7 +7,7 @@ from pandas.compat import string_types from pandas.core.dtypes.cast import find_common_type -from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like) +from pandas.core.dtypes.common import (_get_dtype_from_object, is_list_like, is_numeric_dtype) from pandas.core.index import _ensure_index from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions @@ -117,10 +117,29 @@ def _prepare_method(self, pandas_func, **kwargs): :param kwargs: :return: """ + if self._is_transposed: - return lambda df: pandas_func(df.T, **kwargs) + def helper(df, internal_indices=[]): + return pandas_func(df.T, **kwargs) else: - return lambda df: pandas_func(df, **kwargs) + def helper(df, internal_indices=[]): + return pandas_func(df, **kwargs) + return helper + + def numeric_indices(self): + """Returns the numeric indices + + Args: + axis: The axis to extract the indices from. + + Returns: + List of index names + """ + columns = list() + for col, dtype in zip(self.columns, self.dtypes): + if is_numeric_dtype(dtype): + columns.append(col) + return columns # END Internal methods # Metadata modification methods @@ -311,55 +330,6 @@ def _inter_df_op_handler(self, func, other, **kwargs): else: return self.scalar_operations(axis, other, lambda df: func(df, other, **kwargs)) - def where(self, cond, other, **kwargs): - cls = type(self) - - assert isinstance(cond, type(self)), \ - "Must have the same DataManager subclass to perform this operation" - - if isinstance(other, type(self)): - # Note: Currently we are doing this with two maps across the entire - # data. This can be done with a single map, but it will take a - # modification in the `BlockPartition` class. - # If this were in one pass it would be ~2x faster. - # TODO rewrite this to take one pass. - def where_builder_first_pass(cond, other, **kwargs): - return cond.where(cond, other, **kwargs) - - def where_builder_second_pass(df, new_other, **kwargs): - return df.where(new_other == True, new_other, **kwargs) - - # We are required to perform this reindexing on everything to - # shuffle the data together - reindexed_cond = cond.reindex(0, self.index).data - reindexed_other = other.reindex(0, self.index).data - reindexed_self = self.reindex(0, self.index).data - - first_pass = reindexed_cond.inter_data_operation(1, lambda l, r: where_builder_first_pass(l, r, **kwargs), reindexed_other) - final_pass = reindexed_self.inter_data_operation(1, lambda l, r: where_builder_second_pass(l, r, **kwargs), first_pass) - return cls(final_pass, self.index, self.columns) - else: - axis = kwargs.get("axis", 0) - - def where_builder_series(df, cond, other, **kwargs): - return df.where(cond, other, **kwargs) - - reindexed_self = self.reindex(axis, self.index if not axis else self.columns).data - reindexed_cond = cond.reindex(axis, self.index if not axis else self.columns).data - - new_data = reindexed_self.inter_data_operation(axis, lambda l, r: where_builder_series(l, r, other, **kwargs), reindexed_cond) - return cls(new_data, self.index, self.columns) - - def update(self, other, **kwargs): - assert isinstance(other, type(self)), \ - "Must have the same DataManager subclass to perform this operation" - - def update_builder(df, other, **kwargs): - df.update(other, **kwargs) - return df - - return self._inter_df_op_handler(update_builder, other, **kwargs) - def add(self, other, **kwargs): # TODO: need to write a prepare_function for inter_df operations func = pandas.DataFrame.add @@ -369,10 +339,6 @@ def div(self, other, **kwargs): func = pandas.DataFrame.div return self._inter_df_op_handler(func, other, **kwargs) - def rdiv(self, other, **kwargs): - func = pandas.DataFrame.rdiv - return self._inter_df_op_handler(func, other, **kwargs) - def eq(self, other, **kwargs): func = pandas.DataFrame.eq return self._inter_df_op_handler(func, other, **kwargs) @@ -413,22 +379,74 @@ def pow(self, other, **kwargs): func = pandas.DataFrame.pow return self._inter_df_op_handler(func, other, **kwargs) - def rpow(self, other, **kwargs): - func = pandas.DataFrame.rpow + def rdiv(self, other, **kwargs): + func = pandas.DataFrame.rdiv return self._inter_df_op_handler(func, other, **kwargs) - def sub(self, other, **kwargs): - func = pandas.DataFrame.sub + def rpow(self, other, **kwargs): + func = pandas.DataFrame.rpow return self._inter_df_op_handler(func, other, **kwargs) def rsub(self, other, **kwargs): func = pandas.DataFrame.rsub return self._inter_df_op_handler(func, other, **kwargs) + def sub(self, other, **kwargs): + func = pandas.DataFrame.sub + return self._inter_df_op_handler(func, other, **kwargs) + def truediv(self, other, **kwargs): func = pandas.DataFrame.truediv return self._inter_df_op_handler(func, other, **kwargs) + def update(self, other, **kwargs): + assert isinstance(other, type(self)), \ + "Must have the same DataManager subclass to perform this operation" + + def update_builder(df, other, **kwargs): + df.update(other, **kwargs) + return df + + return self._inter_df_op_handler(update_builder, other, **kwargs) + + def where(self, cond, other, **kwargs): + cls = type(self) + + assert isinstance(cond, type(self)), \ + "Must have the same DataManager subclass to perform this operation" + + if isinstance(other, type(self)): + # Note: Currently we are doing this with two maps across the entire + # data. This can be done with a single map, but it will take a + # modification in the `BlockPartition` class. + # If this were in one pass it would be ~2x faster. + # TODO rewrite this to take one pass. + def where_builder_first_pass(cond, other, **kwargs): + return cond.where(cond, other, **kwargs) + + def where_builder_second_pass(df, new_other, **kwargs): + return df.where(new_other == True, new_other, **kwargs) + + # We are required to perform this reindexing on everything to + # shuffle the data together + reindexed_cond = cond.reindex(0, self.index).data + reindexed_other = other.reindex(0, self.index).data + reindexed_self = self.reindex(0, self.index).data + + first_pass = reindexed_cond.inter_data_operation(1, lambda l, r: where_builder_first_pass(l, r, **kwargs), reindexed_other) + final_pass = reindexed_self.inter_data_operation(1, lambda l, r: where_builder_second_pass(l, r, **kwargs), first_pass) + return cls(final_pass, self.index, self.columns) + else: + axis = kwargs.get("axis", 0) + + def where_builder_series(df, cond, other, **kwargs): + return df.where(cond, other, **kwargs) + + reindexed_self = self.reindex(axis, self.index if not axis else self.columns).data + reindexed_cond = cond.reindex(axis, self.index if not axis else self.columns).data + + new_data = reindexed_self.inter_data_operation(axis, lambda l, r: where_builder_series(l, r, other, **kwargs), reindexed_cond) + return cls(new_data, self.index, self.columns) # END Inter-Data operations # Single Manager scalar operations (e.g. add to scalar, list of scalars) @@ -522,11 +540,19 @@ def transpose(self, *args, **kwargs): # Currently, this means a Pandas Series will be returned, but in the future # we will implement a Distributed Series, and this will be returned # instead. - def full_reduce(self, axis, map_func, reduce_func=None): - if not axis: - index = self.columns + def full_reduce(self, axis, map_func, reduce_func=None, numeric_only=False): + if numeric_only: + index = self.numeric_indices() + if len(index) == 0: + return pandas.Series(dtype=np.float64) + nonnumeric = [col for col, dtype in zip(self.columns, self.dtypes) if not is_numeric_dtype(dtype)] + if axis: + return self.drop(columns=nonnumeric).full_reduce(axis, map_func) else: - index = self.index + if not axis: + index = self.columns + else: + index = self.index if reduce_func is None: reduce_func = map_func @@ -539,40 +565,51 @@ def full_reduce(self, axis, map_func, reduce_func=None): def count(self, **kwargs): axis = kwargs.get("axis", 0) + numeric_only = kwargs.get("numeric_only", False) map_func = self._prepare_method(pandas.DataFrame.count, **kwargs) reduce_func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(axis, map_func, reduce_func) + return self.full_reduce(axis, map_func, reduce_func, numeric_only) def max(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) + numeric_only = True if axis else kwargs.get("numeric_only", False) func = self._prepare_method(pandas.DataFrame.max, **kwargs) - return self.full_reduce(axis, func) + return self.full_reduce(axis, func, numeric_only=numeric_only) def mean(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - length = len(self.index) if not axis else len(self.columns) + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - return self.sum(**kwargs) / length + def mean_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.mean(df, **kwargs) + + func = self._prepare_method(mean_builder, **kwargs) + return self.full_reduce(axis, func, numeric_only=True) def min(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) + numeric_only = True if axis else kwargs.get("numeric_only", False) func = self._prepare_method(pandas.DataFrame.min, **kwargs) - return self.full_reduce(axis, func) + return self.full_reduce(axis, func, numeric_only=numeric_only) def prod(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) + index = self.index if axis else self.columns func = self._prepare_method(pandas.DataFrame.prod, **kwargs) - return self.full_reduce(axis, func) + return self.full_reduce(axis, func, numeric_only=True) def sum(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) + numeric_only = True if axis else kwargs.get("numeric_only", False) func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(axis, func) + return self.full_reduce(axis, func, numeric_only=numeric_only) # END Full Reduce operations # Map partitions operations @@ -640,11 +677,6 @@ def full_axis_reduce(self, func, axis): return result - def _post_process_idx_ops(self, axis, intermediate_result): - index = self.index if not axis else self.columns - result = intermediate_result.apply(lambda x: index[x]) - return result - def all(self, **kwargs): axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.all, **kwargs) @@ -655,6 +687,26 @@ def any(self, **kwargs): func = self._prepare_method(pandas.DataFrame.any, **kwargs) return self.full_axis_reduce(func, axis) + def first_valid_index(self): + + # It may be possible to incrementally check each partition, but this + # computation is fairly cheap. + def first_valid_index_builder(df): + df.index = pandas.RangeIndex(len(df.index)) + return df.apply(lambda df: df.first_valid_index()) + + func = self._prepare_method(first_valid_index_builder) + # We get the minimum from each column, then take the min of that to get + # first_valid_index. + first_result = self.full_axis_reduce(func, 0) + + return self.index[first_result.min()] + + def _post_process_idx_ops(self, axis, intermediate_result): + index = self.index if not axis else self.columns + result = intermediate_result.apply(lambda x: index[x]) + return result + def idxmax(self, **kwargs): # The reason for the special treatment with idxmax/min is because we @@ -705,21 +757,6 @@ def info_builder(df, **kwargs): func = self._prepare_method(info_builder, **kwargs) return self.full_axis_reduce(func, 0) - def first_valid_index(self): - - # It may be possible to incrementally check each partition, but this - # computation is fairly cheap. - def first_valid_index_builder(df): - df.index = pandas.RangeIndex(len(df.index)) - return df.apply(lambda df: df.first_valid_index()) - - func = self._prepare_method(first_valid_index_builder) - # We get the minimum from each column, then take the min of that to get - # first_valid_index. - first_result = self.full_axis_reduce(func, 0) - - return self.index[first_result.min()] - def last_valid_index(self): def last_valid_index_builder(df): @@ -733,12 +770,6 @@ def last_valid_index_builder(df): return self.index[first_result.max()] - def median(self, **kwargs): - # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", 0) - func = self._prepare_method(pandas.DataFrame.median, **kwargs) - return self.full_axis_reduce(func, axis) - def memory_usage(self, **kwargs): def memory_usage_builder(df, **kwargs): return df.memory_usage(index=False, deep=deep) @@ -752,18 +783,6 @@ def nunique(self, **kwargs): func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) return self.full_axis_reduce(func, axis) - def skew(self, **kwargs): - # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", 0) - func = self._prepare_method(pandas.DataFrame.skew, **kwargs) - return self.full_axis_reduce(func, axis) - - def std(self, **kwargs): - # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", 0) - func = self._prepare_method(pandas.DataFrame.std, **kwargs) - return self.full_axis_reduce(func, axis) - def to_datetime(self, **kwargs): columns = self.columns def to_datetime_builder(df, **kwargs): @@ -771,104 +790,135 @@ def to_datetime_builder(df, **kwargs): return pandas.to_datetime(df, **kwargs) func = self._prepare_method(to_datetime_builder, **kwargs) return self.full_axis_reduce(func, 1) + # END Column/Row partitions reduce operations - def var(self, **kwargs): + # Column/Row partitions reduce operations over select indices + # + # These operations result in a reduced dimensionality of data. + # Currently, this means a Pandas Series will be returned, but in the future + # we will implement a Distributed Series, and this will be returned + # instead. + def full_axis_reduce_along_select_indices(self, func, axis, index, pandas_result=True): + # Convert indices to numeric indices + old_index = self.index if axis else self.columns + numeric_indices = [i for i, name in enumerate(old_index) if name in index] + result = self.data.apply_func_to_select_indices_along_full_axis(axis, func, numeric_indices) + + if pandas_result: + result = result.to_pandas(self._is_transposed) + if not axis: + result.index = index + else: + result.index = index + + return result + + def describe(self, **kwargs): + cls = type(self) + axis = 0 + + new_index = self.numeric_indices() + if len(new_index) != 0: + numeric = True + else: + numeric = False + # If no numeric dtypes, then do all + new_index = self.columns + + def describe_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.describe(df, **kwargs) + + func = self._prepare_method(describe_builder, **kwargs) + new_data = self.full_axis_reduce_along_select_indices(func, 0, new_index, False) + new_index = self.compute_index(0, new_data, False) + new_columns = self.compute_index(1, new_data, True) + if numeric: + new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) + else: + new_dtypes = pandas.Series([np.object for _ in new_columns], index=new_columns) + + return cls(new_data, new_index, new_columns, new_dtypes) + + def median(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) - func = self._prepare_method(pandas.DataFrame.var, **kwargs) - return self.full_axis_reduce(func, axis) - def quantile_for_single_value(self, **kwargs): - axis = kwargs.get("axis", 0) - q = kwargs.get("q", 0.5) - assert type(q) is float + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) + def median_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.median(df, **kwargs) - result = self.full_axis_reduce(func, axis) - result.name = q - return result - # END Column/Row partitions reduce operations + func = self._prepare_method(median_builder, **kwargs) + return self.full_axis_reduce_along_select_indices(func, axis, new_index) - # Map across rows/columns - # These operations require some global knowledge of the full column/row - # that is being operated on. This means that we have to put all of that - # data in the same place. - def map_across_full_axis(self, axis, func): - return self.data.map_across_full_axis(axis, func) + def skew(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", 0) - def query(self, expr, **kwargs): - cls = type(self) - columns = self.columns + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - def query_builder(df, **kwargs): - # This is required because of an Arrow limitation - # TODO revisit for Arrow error - df = df.copy() - df.index = pandas.RangeIndex(len(df)) - df.columns = columns - df.query(expr, inplace=True, **kwargs) - df.columns = pandas.RangeIndex(len(df.columns)) - return df + def skew_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.skew(df, **kwargs) - func = self._prepare_method(query_builder, **kwargs) - new_data = self.map_across_full_axis(1, func) - # Query removes rows, so we need to update the index - new_index = self.compute_index(0, new_data, True) + func = self._prepare_method(skew_builder, internal_indices=[], **kwargs) + return self.full_axis_reduce_along_select_indices(func, axis, new_index) - return cls(new_data, new_index, self.columns, self.dtypes) + def std(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", 0) - def eval(self, expr, **kwargs): - cls = type(self) - inplace = kwargs.get("inplace", False) + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - columns = self.index if self._is_transposed else self.columns - index = self.columns if self._is_transposed else self.index + def std_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.std(df, **kwargs) - # Dun eval on columns to determine result type - columns_copy = pandas.DataFrame(columns=self.columns) - columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) - expect_series = isinstance(columns_copy, pandas.Series) + func = self._prepare_method(std_builder, **kwargs) + return self.full_axis_reduce_along_select_indices(func, axis, new_index) - # if there is no assignment, then we simply save the results - # in the first column - if expect_series: - if inplace: - raise ValueError("Cannot operate inplace if there is no assignment") - else: - expr = "{0} = {1}".format(columns[0], expr) + def var(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", 0) - def eval_builder(df, **kwargs): - df.columns = columns - result = df.eval(expr, inplace=False, **kwargs) - result.columns = pandas.RangeIndex(0, len(result.columns)) - return result + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - func = self._prepare_method(eval_builder, **kwargs) - new_data = self.map_across_full_axis(1, func) + def var_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.var(df, **kwargs) - if expect_series: - result = new_data.to_pandas()[0] - result.name = columns_copy.name - result.index = index - return result - else: - columns = columns_copy.columns - return cls(new_data, self.index, columns) + func = self._prepare_method(var_builder, **kwargs) + return self.full_axis_reduce_along_select_indices(func, axis, new_index) - def quantile_for_list_of_values(self, **kwargs): - cls = type(self) + def quantile_for_single_value(self, **kwargs): axis = kwargs.get("axis", 0) q = kwargs.get("q", 0.5) - assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) + assert type(q) is float - func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) + new_index = self.numeric_indices() + if len(new_index) == 0: + return pandas.Series(dtype=np.float64) - q_index = pandas.Float64Index(q) + def quantile_builder(df, internal_indices=[], **kwargs): + return pandas.DataFrame.quantile(df, **kwargs) - new_data = self.map_across_full_axis(axis, func) - new_columns = self.columns if not axis else self.index - return cls(new_data, q_index, new_columns) + func = self._prepare_method(quantile_builder, **kwargs) + result = self.full_axis_reduce_along_select_indices(func, axis, new_index) + result.name = q + return result + # END Column/Row partitions reduce operations over select indices + + # Map across rows/columns + # These operations require some global knowledge of the full column/row + # that is being operated on. This means that we have to put all of that + # data in the same place. + def map_across_full_axis(self, axis, func): + return self.data.map_across_full_axis(axis, func) def _cumulative_builder(self, func, **kwargs): cls = type(self) @@ -889,6 +939,16 @@ def cummin(self, **kwargs): def cumprod(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumprod, **kwargs) + def diff(self, **kwargs): + cls = type(self) + + axis = kwargs.get("axis", 0) + + func = self._prepare_method(pandas.DataFrame.diff, **kwargs) + new_data = self.map_across_full_axis(axis, func) + + return cls(new_data, self.index, self.columns) + def dropna(self, **kwargs): axis = kwargs.get("axis", 0) subset = kwargs.get("subset") @@ -930,6 +990,44 @@ def dropna(self, **kwargs): return self.drop(index=rm_from_index, columns=rm_from_columns) + def eval(self, expr, **kwargs): + cls = type(self) + inplace = kwargs.get("inplace", False) + + columns = self.index if self._is_transposed else self.columns + index = self.columns if self._is_transposed else self.index + + # Dun eval on columns to determine result type + columns_copy = pandas.DataFrame(columns=self.columns) + columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) + expect_series = isinstance(columns_copy, pandas.Series) + + # if there is no assignment, then we simply save the results + # in the first column + if expect_series: + if inplace: + raise ValueError("Cannot operate inplace if there is no assignment") + else: + expr = "{0} = {1}".format(columns[0], expr) + + def eval_builder(df, **kwargs): + df.columns = columns + result = df.eval(expr, inplace=False, **kwargs) + result.columns = pandas.RangeIndex(0, len(result.columns)) + return result + + func = self._prepare_method(eval_builder, **kwargs) + new_data = self.map_across_full_axis(1, func) + + if expect_series: + result = new_data.to_pandas()[0] + result.name = columns_copy.name + result.index = index + return result + else: + columns = columns_copy.columns + return cls(new_data, self.index, columns) + def mode(self, **kwargs): cls = type(self) @@ -976,44 +1074,106 @@ def fillna_dict_builder(df, func_dict={}): new_data = self.map_across_full_axis(axis, func) return cls(new_data, self.index, self.columns) - def describe(self, **kwargs): + def quantile_for_list_of_values(self, **kwargs): cls = type(self) - axis = 0 + axis = kwargs.get("axis", 0) + q = kwargs.get("q", 0.5) + assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) + + index = self.index if axis else self.columns + new_columns = list() + for i, dtype in enumerate(self.dtypes): + if is_numeric_dtype(dtype): + new_columns.append(index[i]) + + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) + + q_index = pandas.Float64Index(q) - func = self._prepare_method(pandas.DataFrame.describe, **kwargs) new_data = self.map_across_full_axis(axis, func) - new_index = self.compute_index(0, new_data, False) - new_columns = self.compute_index(1, new_data, True) - new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) + return cls(new_data, q_index, new_columns) - return cls(new_data, new_index, new_columns, new_dtypes) + def query(self, expr, **kwargs): + cls = type(self) + columns = self.columns + + def query_builder(df, **kwargs): + # This is required because of an Arrow limitation + # TODO revisit for Arrow error + df = df.copy() + df.index = pandas.RangeIndex(len(df)) + df.columns = columns + df.query(expr, inplace=True, **kwargs) + df.columns = pandas.RangeIndex(len(df.columns)) + return df + + func = self._prepare_method(query_builder, **kwargs) + new_data = self.map_across_full_axis(1, func) + # Query removes rows, so we need to update the index + new_index = self.compute_index(0, new_data, True) + + return cls(new_data, new_index, self.columns, self.dtypes) def rank(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) + numeric_only = True if axis else kwargs.get("numeric_only", False) func = self._prepare_method(pandas.DataFrame.rank, **kwargs) new_data = self.map_across_full_axis(axis, func) # Since we assume no knowledge of internal state, we get the columns # from the internal partitions. - if kwargs.get("numeric_only", False): + if numeric_only: new_columns = self.compute_index(1, new_data, True) else: new_columns = self.columns new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) return cls(new_data, self.index, new_columns, new_dtypes) + # END Map across rows/columns - def diff(self, **kwargs): + # Map across select rows/columns + # These operations require some global knowledge of the full column/row + # that is being operated on. This means that we have to put all of that + # data in the same place. + def astype(self, col_dtypes, errors='raise', **kwargs): cls = type(self) - axis = kwargs.get("axis", 0) + # Group the indicies to update together and create new dtypes series + dtype_indices = dict() + columns = col_dtypes.keys() + new_dtypes = self.dtypes.copy() - func = self._prepare_method(pandas.DataFrame.diff, **kwargs) - new_data = self.map_across_full_axis(axis, func) + numeric_indices = list(self.columns.get_indexer_for(columns)) - return cls(new_data, self.index, self.columns) + for i, column in enumerate(columns): + dtype = col_dtypes[column] + if dtype != self.dtypes[column]: + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) + else: + dtype_indices[dtype] = [numeric_indices[i]] + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.dtype('int64') + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.dtype('float64') + new_dtypes[column] = new_dtype + + new_data = self.data + for dtype in dtype_indices.keys(): + resulting_dtype = None + + def astype(df, internal_indices=[]): + block_dtypes = dict() + for ind in internal_indices: + block_dtypes[df.columns[ind]] = dtype + return df.astype(block_dtypes) + + new_data = new_data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) + + return cls(new_data, self.index, self.columns, new_dtypes) # END Map across rows/columns # Head/Tail/Front/Back @@ -1197,47 +1357,6 @@ def insert(df, internal_indices=[]): return cls(new_data, self.index, new_columns, new_dtypes) # END Insert - # astype - # This method changes the types of select columns to the new dtype. - def astype(self, col_dtypes, errors='raise', **kwargs): - cls = type(self) - - # Group the indicies to update together and create new dtypes series - dtype_indices = dict() - columns = col_dtypes.keys() - new_dtypes = self.dtypes.copy() - - numeric_indices = list(self.columns.get_indexer_for(columns)) - - for i, column in enumerate(columns): - dtype = col_dtypes[column] - if dtype != self.dtypes[column]: - if dtype in dtype_indices.keys(): - dtype_indices[dtype].append(numeric_indices[i]) - else: - dtype_indices[dtype] = [numeric_indices[i]] - new_dtype = np.dtype(dtype) - if dtype != np.int32 and new_dtype == np.int32: - new_dtype = np.dtype('int64') - elif dtype != np.float32 and new_dtype == np.float32: - new_dtype = np.dtype('float64') - new_dtypes[column] = new_dtype - - new_data = self.data - for dtype in dtype_indices.keys(): - resulting_dtype = None - - def astype(df, internal_indices=[]): - block_dtypes = dict() - for ind in internal_indices: - block_dtypes[df.columns[ind]]= dtype - return df.astype(block_dtypes) - - new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) - - return cls(new_data, self.index, self.columns, new_dtypes) - # END type conversions - # UDF (apply and agg) methods # There is a wide range of behaviors that are supported, so a lot of the # logic can get a bit convoluted. diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 9cfaeafbb1b..e4c3b045d2e 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -328,16 +328,19 @@ def to_pandas(self, is_transposed=False): raise ValueError("Some partitions contain Series and some contain DataFrames") df_rows = [pandas.concat([part for part in row], axis=axis) for row in retrieved_objects] - result = pandas.concat(df_rows) - return result + + if len(df_rows) == 0: + return pandas.DataFrame() + else: + return pandas.concat(df_rows) @classmethod def from_pandas(cls, df): num_splits = cls._compute_num_partitions() put_func = cls._partition_class.put - row_chunksize = compute_chunksize(len(df), num_splits) - col_chunksize = compute_chunksize(len(df.columns), num_splits) + row_chunksize = max(1, compute_chunksize(len(df), num_splits)) + col_chunksize = max(1, compute_chunksize(len(df.columns), num_splits)) # Each chunk must have a RangeIndex that spans its length and width # according to our invariant. diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 10fe9685f35..2cbe0a3925e 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1622,7 +1622,11 @@ def get_dtype_counts(self): Returns: The counts of dtypes in this object. """ - return self.dtypes.value_counts() + result = self.dtypes.value_counts() + result.index = result.index.map(lambda x: str(x)) + result = result.sort_index() + result.index = result.index.map(lambda x: np.dtype(getattr(np, x))) + return result def get_ftype_counts(self): """Get the counts of ftypes in this object. @@ -1630,7 +1634,7 @@ def get_ftype_counts(self): Returns: The counts of ftypes in this object. """ - return self.ftypes.value_counts() + return self.ftypes.value_counts().sort_index() def get_value(self, index, col, takeable=False): raise NotImplementedError( diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index af8079f1218..5adc7f5056e 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -144,7 +144,7 @@ def test_int_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) - test_pipe(ray_df, pandas_df) + #test_pipe(ray_df, pandas_df) test_loc(ray_df, pandas_df) test_iloc(ray_df, pandas_df) @@ -263,7 +263,7 @@ def test_float_dataframe(): test_mean(ray_df, pandas_df) # TODO Clear floating point error. - # test_var(ray_df, pandas_df) + test_var(ray_df, pandas_df) test_std(ray_df, pandas_df) test_median(ray_df, pandas_df) @@ -301,7 +301,7 @@ def test_float_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) - test_pipe(ray_df, pandas_df) + #test_pipe(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df) @@ -435,7 +435,7 @@ def test_mixed_dtype_dataframe(): test_mean(ray_df, pandas_df) # TODO Clear floating point error. - # test_var(ray_df, pandas_df) + test_var(ray_df, pandas_df) test_std(ray_df, pandas_df) test_median(ray_df, pandas_df) @@ -447,7 +447,7 @@ def test_mixed_dtype_dataframe(): test_describe(ray_df, pandas_df) # TODO Reolve once Pandas-20962 is resolved. - # test_rank(ray_df, pandas_df) + test_rank(ray_df, pandas_df) test_all(ray_df, pandas_df) test_any(ray_df, pandas_df) @@ -636,7 +636,7 @@ def test_nan_dataframe(): test_cummin(ray_df, pandas_df) test_cumprod(ray_df, pandas_df) test_cumsum(ray_df, pandas_df) - test_pipe(ray_df, pandas_df) + #test_pipe(ray_df, pandas_df) test___len__(ray_df, pandas_df) test_first_valid_index(ray_df, pandas_df)