diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 583c3a27067..d7e4fa2887a 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -137,11 +137,12 @@ def _join_index_objects(self, axis, other_index, how, sort=True): return self.index.join(other_index, how=how, sort=sort) def concat(self, axis, other, **kwargs): + ignore_index = kwargs.get("ignore_index", default=False) if axis == 0: if isinstance(other, list): - return self._append_list_of_managers(other, kwargs["ignore_index"]) + return self._append_list_of_managers(other, ignore_index) else: - return self._append_data_manager(other, kwargs["ignore_index"]) + return self._append_data_manager(other, ignore_index) else: if isinstance(other, list): return self._join_list_of_managers(other, **kwargs) @@ -180,9 +181,17 @@ def _append_list_of_managers(self, others, ignore_index): return cls(new_data, new_index, joined_columns) def _join_data_manager(self, other, **kwargs): + assert isinstance(other, type(self)), \ + "This method is for data manager objects only" cls = type(self) - joined_index = self._join_index_objects(1, other.index, kwargs["how"], sort=kwargs["sort"]) + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + joined_index = self._join_index_objects(1, other.index, how, sort=sort) to_join = other.reindex(0, joined_index).data new_self = self.reindex(0, joined_index).data @@ -193,14 +202,30 @@ def _join_data_manager(self, other, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) other_proxy = pandas.DataFrame(columns=other.columns) - new_columns = self_proxy.join(other_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) def _join_list_of_managers(self, others, **kwargs): + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" + cls = type(self) + + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" cls = type(self) - joined_index = self._join_index_objects(1, [other.index for other in others], kwargs["how"], sort=kwargs["sort"]) + joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort) to_join = [other.reindex(0, joined_index).data for other in others] new_self = self.reindex(0, joined_index).data @@ -211,7 +236,7 @@ def _join_list_of_managers(self, others, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) others_proxy = [pandas.DataFrame(columns=other.columns) for other in others] - new_columns = self_proxy.join(others_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) # END Append/Concat/Join (Not Merge) @@ -231,7 +256,7 @@ def inter_manager_operations(self, other, how_to_join, func): new_columns = self._join_index_objects(0, other.columns, how_to_join, sort=False) reindexed_other = other.reindex(0, joined_index).data - reindexed_self = other.reindex(0, joined_index).data + reindexed_self = self.reindex(0, joined_index).data # THere is an interesting serialization anomaly that happens if we do # not use the columns in `inter_data_op_builder` from here (e.g. if we @@ -297,7 +322,7 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs): def reset_index(self, **kwargs): cls = type(self) - drop = kwargs["drop"] + drop = kwargs.get("drop", default=False) new_index = pandas.RangeIndex(len(self.index)) if not drop: @@ -356,31 +381,41 @@ def full_reduce(self, axis, map_func, reduce_func=None): return result def count(self, **kwargs): + axis = kwargs.get("axis", default=0) map_func = self._prepare_method(pandas.DataFrame.count, **kwargs) reduce_func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], map_func, reduce_func) + return self.full_reduce(axis, map_func, reduce_func) def max(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.max, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def mean(self, **kwargs): - axis = kwargs["axis"] + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) length = len(self.index) if not axis else len(self.columns) return self.sum(**kwargs) / length def min(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.min, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def prod(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.prod, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def sum(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) # END Full Reduce operations # Map partitions operations @@ -448,12 +483,14 @@ def _post_process_idx_ops(self, axis, intermediate_result): return result def all(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.all, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def any(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.any, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def idxmax(self, **kwargs): @@ -463,11 +500,12 @@ def idxmax_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmax(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmax_builder, **kwargs) - max_result = self.full_axis_reduce(func, kwargs["axis"]) + max_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], max_result) + return self._post_process_idx_ops(axis, max_result) def idxmin(self, **kwargs): @@ -477,11 +515,12 @@ def idxmin_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmin(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmin_builder, **kwargs) - min_result = self.full_axis_reduce(func, kwargs["axis"]) + min_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], min_result) + return self._post_process_idx_ops(axis, min_result) def first_valid_index(self): @@ -512,30 +551,43 @@ def last_valid_index_builder(df): return self.index[first_result.max()] def median(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.median, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def nunique(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def skew(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.skew, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def std(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.std, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def var(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.var, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def quantile_for_single_value(self, **kwargs): + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert type(q) is float + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) - result = self.full_axis_reduce(func, kwargs["axis"]) - result.name = kwargs["q"] + result = self.full_axis_reduce(func, axis) + result.name = q return result # END Column/Row partitions reduce operations @@ -566,19 +618,23 @@ def query_builder(df): def quantile_for_list_of_values(self, **kwargs): cls = type(self) - q = kwargs["q"] + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) q_index = pandas.Float64Index(q) - new_data = self.map_across_full_axis(kwargs["axis"], func) - new_columns = self.columns if not kwargs["axis"] else self.index + new_data = self.map_across_full_axis(axis, func) + new_columns = self.columns if not axis else self.index return cls(new_data, q_index, new_columns) def _cumulative_builder(self, func, **kwargs): cls = type(self) + axis = kwargs.get("axis", default=0) func = self._prepare_method(func, **kwargs) - new_data = self.map_across_full_axis(kwargs["axis"], func) + new_data = self.map_across_full_axis(axis, func) return cls(new_data, self.index, self.columns) def cumsum(self, **kwargs): @@ -594,8 +650,10 @@ def cumprod(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumprod, **kwargs) def dropna(self, **kwargs): - axis = kwargs["axis"] - subset = kwargs["subset"] + axis = kwargs.get("axis", default=0) + subset = kwargs.get("subset", default=None) + thresh = kwargs.get("thresh", default=None) + how = kwargs.get("how", default="any") # We need to subset the axis that we care about with `subset`. This # will be used to determine the number of values that are NA. if subset is not None: @@ -611,13 +669,12 @@ def dropna(self, **kwargs): # We are building this dictionary first to determine which columns # and rows to drop. This way we do not drop some columns before we # know which rows need to be dropped. - if kwargs["thresh"] is not None: + if thresh is not None: # Count the number of NA values and specify which are higher than # thresh. - thresh = kwargs["thresh"] drop_values = {ax ^ 1: compute_na.isna().sum(axis=ax ^ 1) > thresh for ax in axis} else: - drop_values = {ax ^ 1: getattr(compute_na.isna(), kwargs["how"])(axis=ax ^ 1) for ax in axis} + drop_values = {ax ^ 1: getattr(compute_na.isna(), how)(axis=ax ^ 1) for ax in axis} if 0 not in drop_values: drop_values[0] = None @@ -636,7 +693,7 @@ def dropna(self, **kwargs): def mode(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.mode, **kwargs) new_data = self.map_across_full_axis(axis, func) @@ -657,8 +714,8 @@ def mode(self, **kwargs): def fillna(self, **kwargs): cls = type(self) - axis = kwargs["axis"] - value = kwargs["value"] + axis = kwargs.get("axis", default=0) + value = kwargs.get("value", default=None) if isinstance(value, dict): return @@ -681,7 +738,7 @@ def describe(self, **kwargs): def rank(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.rank, **kwargs) new_data = self.map_across_full_axis(axis, func) if axis: diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index c6768835e54..80d07db39a2 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -359,8 +359,9 @@ def get_indices(self, axis=0, old_blocks=None): A Pandas Index object. """ if axis == 0: + func = self.preprocess_func(lambda df: df.index) # We grab the first column of blocks and extract the indices - new_indices = [idx.apply(lambda df: df.index).get() for idx in self.partitions.T[0]] + new_indices = [idx.apply(func).get() for idx in self.partitions.T[0]] # This is important because sometimes we have resized the data. The new # sizes will not be valid if we are trying to compute the index on a # new object that has a different length. @@ -369,7 +370,8 @@ def get_indices(self, axis=0, old_blocks=None): else: cumulative_block_lengths = np.array(self.block_lengths).cumsum() else: - new_indices = [idx.apply(lambda df: df.columns).get() for idx in self.partitions[0]] + func = self.preprocess_func(lambda df: df.columns) + new_indices = [idx.apply(func).get() for idx in self.partitions[0]] if old_blocks is not None: cumulative_block_lengths = np.array(old_blocks.block_widths).cumsum() @@ -545,6 +547,7 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep indices = [indices] partitions_dict = self._get_dict_of_block_index(axis, indices) + preprocessed_func = self.preprocess_func(func) # Since we might be keeping the remaining blocks that are not modified, # we have to also keep the block_partitions object in the correct @@ -558,10 +561,10 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep if not keep_remaining: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in partitions_dict]) + result = np.array([partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in partitions_dict]) else: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) + result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) return cls(result.T) if not axis else cls(result) diff --git a/modin/data_management/partitioning/remote_partition.py b/modin/data_management/partitioning/remote_partition.py index b65a6ca0197..3d86d918085 100644 --- a/modin/data_management/partitioning/remote_partition.py +++ b/modin/data_management/partitioning/remote_partition.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import pandas import ray @@ -167,7 +168,10 @@ def to_pandas(self): Returns: A Pandas DataFrame. """ - return self.get() + dataframe = self.get() + assert type(dataframe) is pandas.DataFrame() + + return dataframe @classmethod def put(cls, obj):