From e7e78ceb504030b44713af0c2638a3aea2ff3b0b Mon Sep 17 00:00:00 2001 From: William Ma Date: Wed, 29 Aug 2018 16:54:40 -0700 Subject: [PATCH 01/28] Added type checking and changed how variables were read in from kwargs --- modin/data_management/data_manager.py | 137 +++++++++++++----- .../partitioning/partition_collections.py | 14 +- .../partitioning/remote_partition.py | 6 +- 3 files changed, 110 insertions(+), 47 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index ef27403aa5f..44050697347 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -131,11 +131,12 @@ def _join_index_objects(self, axis, other_index, how, sort=True): return self.index.join(other_index, how=how, sort=sort) def concat(self, axis, other, **kwargs): + ignore_index = kwargs.get("ignore_index", default=False) if axis == 0: if isinstance(other, list): - return self._append_list_of_managers(other, kwargs["ignore_index"]) + return self._append_list_of_managers(other, ignore_index) else: - return self._append_data_manager(other, kwargs["ignore_index"]) + return self._append_data_manager(other, ignore_index) else: if isinstance(other, list): return self._join_list_of_managers(other, **kwargs) @@ -174,9 +175,17 @@ def _append_list_of_managers(self, others, ignore_index): return cls(new_data, new_index, joined_columns) def _join_data_manager(self, other, **kwargs): + assert isinstance(other, type(self)), \ + "This method is for data manager objects only" cls = type(self) - joined_index = self._join_index_objects(1, other.index, kwargs["how"], sort=kwargs["sort"]) + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + joined_index = self._join_index_objects(1, other.index, how, sort=sort) to_join = other.reindex(0, joined_index).data new_self = self.reindex(0, joined_index).data @@ -187,14 +196,30 @@ def _join_data_manager(self, other, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) other_proxy = pandas.DataFrame(columns=other.columns) - new_columns = self_proxy.join(other_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) def _join_list_of_managers(self, others, **kwargs): + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" + cls = type(self) + + # Uses join's default value (though should not revert to default) + how = kwargs.get("how", default="left") + sort = kwargs.get("sort", default=False) + lsuffix = kwargs.get("lsuffix", default="") + rsuffix = kwargs.get("rsuffix", default="") + + assert isinstance(others, list), \ + "This method is for lists of DataManager objects only" + assert all(isinstance(other, type(self)) for other in others), \ + "Different Manager objects are being used. This is not allowed" cls = type(self) - joined_index = self._join_index_objects(1, [other.index for other in others], kwargs["how"], sort=kwargs["sort"]) + joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort) to_join = [other.reindex(0, joined_index).data for other in others] new_self = self.reindex(0, joined_index).data @@ -205,7 +230,7 @@ def _join_list_of_managers(self, others, **kwargs): # suffixes. self_proxy = pandas.DataFrame(columns=self.columns) others_proxy = [pandas.DataFrame(columns=other.columns) for other in others] - new_columns = self_proxy.join(others_proxy, lsuffix=kwargs["lsuffix"], rsuffix=kwargs["rsuffix"]).columns + new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns return cls(new_data, joined_index, new_columns) # END Append/Concat/Join (Not Merge) @@ -225,7 +250,7 @@ def inter_manager_operations(self, other, how_to_join, func): new_columns = self._join_index_objects(0, other.columns, how_to_join, sort=False) reindexed_other = other.reindex(0, joined_index).data - reindexed_self = other.reindex(0, joined_index).data + reindexed_self = self.reindex(0, joined_index).data # THere is an interesting serialization anomaly that happens if we do # not use the columns in `inter_data_op_builder` from here (e.g. if we @@ -291,7 +316,7 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs): def reset_index(self, **kwargs): cls = type(self) - drop = kwargs["drop"] + drop = kwargs.get("drop", default=False) new_index = pandas.RangeIndex(len(self.index)) if not drop: @@ -350,31 +375,41 @@ def full_reduce(self, axis, map_func, reduce_func=None): return result def count(self, **kwargs): + axis = kwargs.get("axis", default=0) map_func = self._prepare_method(pandas.DataFrame.count, **kwargs) reduce_func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], map_func, reduce_func) + return self.full_reduce(axis, map_func, reduce_func) def max(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.max, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def mean(self, **kwargs): - axis = kwargs["axis"] + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) length = len(self.index) if not axis else len(self.columns) return self.sum(**kwargs) / length def min(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.min, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def prod(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.prod, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) def sum(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.sum, **kwargs) - return self.full_reduce(kwargs["axis"], func) + return self.full_reduce(axis, func) # END Full Reduce operations # Map partitions operations @@ -442,12 +477,14 @@ def _post_process_idx_ops(self, axis, intermediate_result): return result def all(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.all, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def any(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.any, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def idxmax(self, **kwargs): @@ -457,11 +494,12 @@ def idxmax_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmax(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmax_builder, **kwargs) - max_result = self.full_axis_reduce(func, kwargs["axis"]) + max_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], max_result) + return self._post_process_idx_ops(axis, max_result) def idxmin(self, **kwargs): @@ -471,11 +509,12 @@ def idxmin_builder(df, **kwargs): df.index = pandas.RangeIndex(len(df.index)) return df.idxmin(**kwargs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(idxmin_builder, **kwargs) - min_result = self.full_axis_reduce(func, kwargs["axis"]) + min_result = self.full_axis_reduce(func, axis) # Because our internal partitions don't track the external index, we # have to do a conversion. - return self._post_process_idx_ops(kwargs["axis"], min_result) + return self._post_process_idx_ops(axis, min_result) def first_valid_index(self): @@ -506,30 +545,43 @@ def last_valid_index_builder(df): return self.index[first_result.max()] def median(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.median, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def nunique(self, **kwargs): + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def skew(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.skew, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def std(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.std, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def var(self, **kwargs): + # Pandas default is 0 (though not mentioned in docs) + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.var, **kwargs) - return self.full_axis_reduce(func, kwargs["axis"]) + return self.full_axis_reduce(func, axis) def quantile_for_single_value(self, **kwargs): + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert type(q) is float + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) - result = self.full_axis_reduce(func, kwargs["axis"]) - result.name = kwargs["q"] + result = self.full_axis_reduce(func, axis) + result.name = q return result # END Column/Row partitions reduce operations @@ -560,19 +612,23 @@ def query_builder(df): def quantile_for_list_of_values(self, **kwargs): cls = type(self) - q = kwargs["q"] + axis = kwargs.get("axis", default=0) + q = kwargs.get("q", default=0.5) + assert isinstance(q, (pandas.Series, np.ndarray, pandas.Index, list)) + func = self._prepare_method(pandas.DataFrame.quantile, **kwargs) q_index = pandas.Float64Index(q) - new_data = self.map_across_full_axis(kwargs["axis"], func) - new_columns = self.columns if not kwargs["axis"] else self.index + new_data = self.map_across_full_axis(axis, func) + new_columns = self.columns if not axis else self.index return cls(new_data, q_index, new_columns) def _cumulative_builder(self, func, **kwargs): cls = type(self) + axis = kwargs.get("axis", default=0) func = self._prepare_method(func, **kwargs) - new_data = self.map_across_full_axis(kwargs["axis"], func) + new_data = self.map_across_full_axis(axis, func) return cls(new_data, self.index, self.columns) def cumsum(self, **kwargs): @@ -588,8 +644,10 @@ def cumprod(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumprod, **kwargs) def dropna(self, **kwargs): - axis = kwargs["axis"] - subset = kwargs["subset"] + axis = kwargs.get("axis", default=0) + subset = kwargs.get("subset", default=None) + thresh = kwargs.get("thresh", default=None) + how = kwargs.get("how", default="any") # We need to subset the axis that we care about with `subset`. This # will be used to determine the number of values that are NA. if subset is not None: @@ -605,13 +663,12 @@ def dropna(self, **kwargs): # We are building this dictionary first to determine which columns # and rows to drop. This way we do not drop some columns before we # know which rows need to be dropped. - if kwargs["thresh"] is not None: + if thresh is not None: # Count the number of NA values and specify which are higher than # thresh. - thresh = kwargs["thresh"] drop_values = {ax ^ 1: compute_na.isna().sum(axis=ax ^ 1) > thresh for ax in axis} else: - drop_values = {ax ^ 1: getattr(compute_na.isna(), kwargs["how"])(axis=ax ^ 1) for ax in axis} + drop_values = {ax ^ 1: getattr(compute_na.isna(), how)(axis=ax ^ 1) for ax in axis} if 0 not in drop_values: drop_values[0] = None @@ -630,7 +687,7 @@ def dropna(self, **kwargs): def mode(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.mode, **kwargs) new_data = self.map_across_full_axis(axis, func) @@ -651,8 +708,8 @@ def mode(self, **kwargs): def fillna(self, **kwargs): cls = type(self) - axis = kwargs["axis"] - value = kwargs["value"] + axis = kwargs.get("axis", default=0) + value = kwargs.get("value", default=None) if isinstance(value, dict): return @@ -675,7 +732,7 @@ def describe(self, **kwargs): def rank(self, **kwargs): cls = type(self) - axis = kwargs["axis"] + axis = kwargs.get("axis", default=0) func = self._prepare_method(pandas.DataFrame.rank, **kwargs) new_data = self.map_across_full_axis(axis, func) if axis: diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 48312d1171c..2af2a300148 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -332,8 +332,7 @@ def to_pandas(self, is_transposed=False): @classmethod def from_pandas(cls, dataframe, num_splits): if num_splits is None: - from ...pandas import DEFAULT_NPARTITIONS - num_splits = DEFAULT_NPARTITIONS + num_splits = cls._compute_num_partitions() pass @@ -353,8 +352,9 @@ def get_indices(self, axis=0, old_blocks=None): A Pandas Index object. """ if axis == 0: + func = self.preprocess_func(lambda df: df.index) # We grab the first column of blocks and extract the indices - new_indices = [idx.apply(lambda df: df.index).get() for idx in self.partitions.T[0]] + new_indices = [idx.apply(func).get() for idx in self.partitions.T[0]] # This is important because sometimes we have resized the data. The new # sizes will not be valid if we are trying to compute the index on a # new object that has a different length. @@ -363,7 +363,8 @@ def get_indices(self, axis=0, old_blocks=None): else: cumulative_block_lengths = np.array(self.block_lengths).cumsum() else: - new_indices = [idx.apply(lambda df: df.columns).get() for idx in self.partitions[0]] + func = self.preprocess_func(lambda df: df.columns) + new_indices = [idx.apply(func).get() for idx in self.partitions[0]] if old_blocks is not None: cumulative_block_lengths = np.array(old_blocks.block_widths).cumsum() @@ -538,6 +539,7 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep indices = [indices] partitions_dict = self._get_dict_of_block_index(axis, indices) + preprocessed_func = self.preprocess_func(func) # Since we might be keeping the remaining blocks that are not modified, # we have to also keep the block_partitions object in the correct @@ -551,10 +553,10 @@ def apply_func_to_select_indices_along_full_axis(self, axis, func, indices, keep if not keep_remaining: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in partitions_dict]) + result = np.array([partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in partitions_dict]) else: # See notes in `apply_func_to_select_indices` - result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) + result = np.array([partitions_for_remaining[i] if i not in partitions_dict else partitions_for_apply[i].apply(preprocessed_func, internal_indices=partitions_dict[i]) for i in range(len(partitions_for_remaining))]) return cls(result.T) if not axis else cls(result) diff --git a/modin/data_management/partitioning/remote_partition.py b/modin/data_management/partitioning/remote_partition.py index b65a6ca0197..3d86d918085 100644 --- a/modin/data_management/partitioning/remote_partition.py +++ b/modin/data_management/partitioning/remote_partition.py @@ -2,6 +2,7 @@ from __future__ import division from __future__ import print_function +import pandas import ray @@ -167,7 +168,10 @@ def to_pandas(self): Returns: A Pandas DataFrame. """ - return self.get() + dataframe = self.get() + assert type(dataframe) is pandas.DataFrame() + + return dataframe @classmethod def put(cls, obj): From cf9b05d472fbe87071c37705b2f3c49843aba614 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 16:26:28 -0700 Subject: [PATCH 02/28] Updated sample to new architecture --- modin/pandas/dataframe.py | 41 +++++++++------------------------------ 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 04920d8f609..57aafed47b2 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -3303,9 +3303,11 @@ def sample(self, else 0 if axis == 0: - axis_length = len(self._row_metadata) + axis_labels = self._data_manager.index + axis_length = len(axis_labels) else: - axis_length = len(self._col_metadata) + axis_labels = self._data_manager.column + axis_length = len(axis_labels) if weights is not None: @@ -3383,15 +3385,6 @@ def sample(self, columns=[] if axis == 1 else self.columns, index=self.index if axis == 1 else []) - if axis == 1: - axis_labels = self.columns - partition_metadata = self._col_metadata - partitions = self._col_partitions - else: - axis_labels = self.index - partition_metadata = self._row_metadata - partitions = self._row_partitions - if random_state is not None: # Get a random number generator depending on the type of # random_state that is passed in @@ -3407,35 +3400,19 @@ def sample(self, # choose random numbers and then get corresponding labels from # chosen axis sample_indices = random_num_gen.randint( - low=0, high=len(partition_metadata), size=n) + low=0, high=axis_length, size=n) samples = axis_labels[sample_indices] else: # randomly select labels from chosen axis samples = np.random.choice( a=axis_labels, size=n, replace=replace, p=weights) - # create an array of (partition, index_within_partition) tuples for - # each sample - part_ind_tuples = [partition_metadata[sample] for sample in samples] - if axis == 1: - # tup[0] refers to the partition number and tup[1] is the index - # within that partition - new_cols = [ - _deploy_func.remote(lambda df: df.iloc[:, [tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - col_partitions=new_cols, columns=samples, index=self.index) + data_manager = self._data_manager.getitem_col_array(samples) + return DataFrame(data_manager=data_manager) else: - new_rows = [ - _deploy_func.remote(lambda df: df.loc[[tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - row_partitions=new_rows, columns=self.columns, index=samples) + data_manager = self._data_manager.getitem_row_array(samples) + return DataFrame(data_manager=data_manager) def select(self, crit, axis=0): raise NotImplementedError( From 872495629790538a40e0423afabc0b8f501cf6f5 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 18:56:22 -0700 Subject: [PATCH 03/28] Made test_sample more rigourous --- modin/pandas/test/test_dataframe.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 99db0b9903c..611cc322321 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -63,6 +63,7 @@ def test_int_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -233,6 +234,7 @@ def test_float_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -402,6 +404,7 @@ def test_mixed_dtype_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -569,6 +572,7 @@ def test_nan_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -2865,10 +2869,19 @@ def test_rtruediv(): test_inter_df_math_right_ops("rtruediv") -def test_sample(): - ray_df = create_test_dataframe() - assert len(ray_df.sample(n=4)) == 4 - assert len(ray_df.sample(frac=0.5)) == 2 +@pytest.fixture +def test_sample(ray_df, pd_df): + with pytest.raises(ValueError): + ray_df.sample(n=3, frac=0.4) + + assert ray_df_equals_pandas( + ray_df.sample(frac=0.5, random_state=42), + pd_df.sample(frac=0.5, random_state=42) + ) + assert ray_df_equals_pandas( + ray_df.sample(n=2, random_state=42), + pd_df.sample(n=2, random_state=42) + ) def test_select(): From 2ad1c3b3961f72f0338ed1b2f2fd7fc28fe1f9e7 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 19:41:35 -0700 Subject: [PATCH 04/28] Removed 'default=' from kwargs.get's --- modin/data_management/data_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 6c80861fc19..43c4e922574 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -388,13 +388,13 @@ def count(self, **kwargs): def max(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", default=0) + axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.max, **kwargs) return self.full_reduce(axis, func) def mean(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) - axis = kwargs.get("axis", default=0) + axis = kwargs.get("axis", 0) length = len(self.index) if not axis else len(self.columns) return self.sum(**kwargs) / length From 502bd0dd8d0252b2e9db4fe437dfb70d048a6e92 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 20:44:25 -0700 Subject: [PATCH 05/28] Updated eval to the new backend --- modin/data_management/data_manager.py | 29 ++++++++++++++++++++++- modin/pandas/dataframe.py | 33 +++++---------------------- 2 files changed, 34 insertions(+), 28 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 43c4e922574..770399399f2 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -599,7 +599,7 @@ def query(self, expr, **kwargs): cls = type(self) columns = self.columns - def query_builder(df): + def query_builder(df, **kwargs): # This is required because of an Arrow limitation # TODO revisit for Arrow error df = df.copy() @@ -616,6 +616,33 @@ def query_builder(df): return cls(new_data, new_index, self.columns) + def eval(self, expr, **kwargs): + cls = type(self) + columns = self.columns + + def eval_builder(df, **kwargs): + df.columns = columns + result = df.eval(expr, inplace=False, **kwargs) + # If result is a series, expr was not an assignment expression. + if not isinstance(result, pandas.Series): + result.columns = pandas.RangeIndex(0, len(result.columns)) + return result + + func = self._prepare_method(eval_builder, **kwargs) + new_data = self.map_across_full_axis(1, func) + + # eval can update the columns, so we must update columns + columns_copy = pandas.DataFrame(columns=columns) + columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) + if isinstance(columns_copy, pandas.Series): + # To create a data manager, we need the + # columns to be in a list-like + columns = list(columns_copy.name) + else: + columns = columns_copy.columns + + return cls(new_data, self.index, columns) + def quantile_for_list_of_values(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 6b983d99f9f..e65c890f7fc 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1525,36 +1525,14 @@ def eval(self, expr, inplace=False, **kwargs): ndarray, numeric scalar, DataFrame, Series """ self._validate_eval_query(expr, **kwargs) - - columns = self.columns - - def eval_helper(df): - df.columns = columns - result = df.eval(expr, inplace=False, **kwargs) - # If result is a series, expr was not an assignment expression. - if not isinstance(result, pandas.Series): - result.columns = pandas.RangeIndex(0, len(result.columns)) - return result - inplace = validate_bool_kwarg(inplace, "inplace") - new_rows = _map_partitions(eval_helper, self._row_partitions) - result_type = ray.get( - _deploy_func.remote(lambda df: type(df), new_rows[0])) - if result_type is pandas.Series: - new_series = pandas.concat(ray.get(new_rows), axis=0, copy=False) - new_series.index = self.index - return new_series - - columns_copy = self._col_metadata._coord_df.copy().T - columns_copy.eval(expr, inplace=True, **kwargs) - columns = columns_copy.columns + data_manager = self._data_manager.eval(expr, **kwargs) if inplace: - self._update_inplace( - row_partitions=new_rows, columns=columns, index=self.index) + self._update_inplace(new_manager=data_manager) else: - return DataFrame(columns=columns, row_partitions=new_rows) + return DataFrame(data_manager=data_manager) def ewm(self, com=None, @@ -2877,6 +2855,7 @@ def query(self, expr, inplace=False, **kwargs): A new DataFrame if inplace=False """ self._validate_eval_query(expr, **kwargs) + inplace = validate_bool_kwarg(inplace, "inplace") new_manager = self._data_manager.query(expr, **kwargs) @@ -3320,8 +3299,8 @@ def sample(self, # choose random numbers and then get corresponding labels from # chosen axis - sample_indices = random_num_gen.randint( - low=0, high=axis_length, size=n) + sample_indices = random_num_gen.choice( + np.arange(0, axis_length), size=n, replace=replace) samples = axis_labels[sample_indices] else: # randomly select labels from chosen axis From 992a8e2ff8f8100240c26f99a8979beecd478eb4 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 30 Aug 2018 20:44:37 -0700 Subject: [PATCH 06/28] Added two more tests for eval --- modin/pandas/test/test_dataframe.py | 30 +++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 611cc322321..9ae6c2c7ced 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -1533,6 +1533,18 @@ def test_eval_df_use_case(): frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} df = pandas.DataFrame(frame_data) ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + df.eval( "e = arctan2(sin(a), b)", engine='python', @@ -1559,6 +1571,24 @@ def test_eval_df_arithmetic_subexpression(): assert ray_df_equals_pandas(ray_df, df) +def test_eval_df_series_result(): + frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} + df = pandas.DataFrame(frame_data) + ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + assert isinstance(to_pandas(tmp_ray), pandas.Series) + + def test_ewm(): ray_df = create_test_dataframe() From 7cbb17a800abff57e4d972fd5c54bccd89484ec2 Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 31 Aug 2018 17:40:42 -0700 Subject: [PATCH 07/28] Updated memory_usage to new backend --- modin/pandas/dataframe.py | 4 ++-- modin/pandas/test/test_dataframe.py | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index e65c890f7fc..dea18dc4aef 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -2326,11 +2326,11 @@ def memory_usage(self, index=True, deep=False): def remote_func(df): return df.memory_usage(index=False, deep=deep) - result = self._map_reduce(axis=0, map_func=remote_func) + result = self._data_manager.full_reduce(axis=0, map_func=remote_func) result.index = self.columns if index: - index_value = self._row_metadata.index.memory_usage(deep=deep) + index_value = self.index.memory_usage(deep=deep) return pandas.Series(index_value, index=['Index']).append(result) return result diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 9ae6c2c7ced..e4327b11b46 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -2279,8 +2279,9 @@ def test_melt(): ray_df.melt() -@pytest.fixture -def test_memory_usage(ray_df): +#@pytest.fixture +def test_memory_usage(): + ray_df = create_test_dataframe() assert type(ray_df.memory_usage()) is pandas.core.series.Series assert ray_df.memory_usage(index=True).at['Index'] is not None assert ray_df.memory_usage(deep=True).sum() >= \ From b144b3d853ca68709a46dd0c931a9cfa8ddb73ea Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 18:12:20 -0700 Subject: [PATCH 08/28] Updated info and memory_usage to the new backend --- modin/pandas/dataframe.py | 149 ++++++++++++++++++++++---------------- 1 file changed, 87 insertions(+), 62 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index dea18dc4aef..fce288f4883 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1896,85 +1896,110 @@ def infer_objects(self): "To contribute to Pandas on Ray, please visit " "github.com/modin-project/modin.") - def info(self, - verbose=None, - buf=None, - max_cols=None, - memory_usage=None, - null_counts=None): - def info_helper(df): - output_buffer = io.StringIO() - df.info( - verbose=verbose, - buf=output_buffer, - max_cols=max_cols, - memory_usage=memory_usage, - null_counts=null_counts) - return output_buffer.getvalue() - - # Combine the per-partition info and split into lines - result = ''.join( - ray.get(_map_partitions(info_helper, self._col_partitions))) - lines = result.split('\n') + def info(self, + verbose=None, + buf=None, + max_cols=None, + memory_usage=None, + null_counts=None): + """Print a concise summary of a DataFrame, which includes the index + dtype and column dtypes, non-null values and memory usage. + Args: + verbose (bool, optional): Whether to print the full summary. Defaults + to true + + buf (writable buffer): Where to send output. Defaults to sys.stdout + + max_cols (int, optional): When to switch from verbose to truncated + output. By defualt, this is 100. + + memory_usage (bool, str, optional): Specifies whether the total memory + usage of the DataFrame elements (including index) should be displayed. + True always show memory usage. False never shows memory usage. A value + of ‘deep’ is equivalent to “True with deep introspection”. Memory usage + is shown in human-readable units (base-2 representation). Without deep + introspection a memory estimation is made based in column dtype and number + of rows assuming values consume the same memory amount for corresponding + dtypes. With deep memory introspection, a real memory usage calculation is + performed at the cost of computational resources. Defaults to True. + + null_counts (bool, optional): Whetehr to show the non-null counts. By default, + this is shown only when the frame is smaller than 100 columns and 1690785 + rows. A value of True always shows the counts and False never shows the + counts. + + Returns: + Prints the summary of a DataFrame and returns None. + """ + index = self._data_manager.index + columns = self._data_manager.columns + dtypes = self.dtypes + + # Set up default values + verbose = True if verbose is None else verbose + buf = sys.stdout if not buf else buf + max_cols = 100 if not max_cols else max_cols + memory_usage = True if memory_usage is None else memory_usage + if not null_counts: + if len(columns) < 100 and len(index) < 1690785: + null_counts = True + else: + null_counts = False + + # Determine if actually verbose + actually_verbose = True if verbose and max_cols > len(columns) else False + + if type(memory_usage) == str and memory_usage == 'deep': + memory_usage_deep = True + else: + memory_usage_deep = False + + # Start putting together output # Class denoted in info() output class_string = '\n' # Create the Index info() string by parsing self.index - index_string = self.index.summary() + '\n' - - # A column header is needed in the inf() output - col_header = 'Data columns (total {0} columns):\n' \ - .format(len(self.columns)) - - # Parse the per-partition values to get the per-column details - # Find all the lines in the output that start with integers - prog = re.compile('^[0-9]+.+') - col_lines = [prog.match(line) for line in lines] - cols = [c.group(0) for c in col_lines if c is not None] - # replace the partition columns names with real column names - columns = [ - "{0}\t{1}\n".format(self.columns[i], cols[i].split(" ", 1)[1]) - for i in range(len(cols)) - ] - col_string = ''.join(columns) + '\n' + index_string = index.summary() + '\n' + + if actually_verbose: + if null_counts: + counts = self.count() + # Create string for verbose output + col_string = 'Data columns (total {0} columns):\n' \ + .format(len(columns)) + for col, dtype in zip(columns, dtypes): + col_string += '{0}\t'.format(col) + if null_counts: + col_string += '{0} not-null '.format(counts[col]) + col_string += '{0}\n'.format(dtype) + else: + # Create string for not verbose output + col_string = 'Columns: {0} entries, {1} to {2}\n'\ + .format(len(columns), columns[0], columns[-1]) # A summary of the dtypes in the dataframe dtypes_string = "dtypes: " - for dtype, count in self.dtypes.value_counts().iteritems(): + for dtype, count in dtypes.value_counts().iteritems(): dtypes_string += "{0}({1}),".format(dtype, count) dtypes_string = dtypes_string[:-1] + '\n' - # Compute the memory usage by summing per-partitions return values - # Parse lines for memory usage number - prog = re.compile('^memory+.+') - mems = [prog.match(line) for line in lines] - mem_vals = [ - float(re.search(r'\d+', m.group(0)).group()) for m in mems - if m is not None - ] - - memory_string = "" - - if len(mem_vals) != 0: - # Sum memory usage from each partition - if memory_usage != 'deep': - memory_string = 'memory usage: {0}+ bytes' \ - .format(sum(mem_vals)) + # Create memory usage string + memory_string = '' + if memory_usage: + mem_data = self.memory_usage(deep=memory_usage_deep) + if memory_usage_deep: + memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: - memory_string = 'memory usage: {0} bytes'.format(sum(mem_vals)) + memory_string = 'memory usage: {0}+ bytes'.format(mem_data.sum()) # Combine all the components of the info() output result = ''.join([ - class_string, index_string, col_header, col_string, dtypes_string, - memory_string - ]) + class_string, index_string, col_string, dtypes_string, memory_string + ]) # Write to specified output buffer - if buf: - buf.write(result) - else: - sys.stdout.write(result) + buf.write(result) def insert(self, loc, column, value, allow_duplicates=False): """Insert column into DataFrame at specified location. From 4e7addaf6f4da182018e14e7db837563195f639d Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 18:12:49 -0700 Subject: [PATCH 09/28] Updated info and memory_usage to be standalone tests and updated the tests --- modin/pandas/test/test_dataframe.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index e4327b11b46..933d975e0da 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -3,6 +3,7 @@ from __future__ import print_function import pytest +import io import numpy as np import pandas import pandas.util.testing as tm @@ -2091,12 +2092,20 @@ def test_infer_objects(): ray_df.infer_objects() -@pytest.fixture -def test_info(ray_df): - info_string = ray_df.info() - assert '\n' in info_string - info_string = ray_df.info(memory_usage=True) - assert 'memory_usage: ' in info_string +#@pytest.fixture +def test_info(): + ray_df = create_test_dataframe() + with io.StringIO() as buf: + ray_df.info(buf=buf) + info_string = buf.getvalue() + assert '\n' in info_string + assert 'memory usage: ' in info_string + assert 'Data columns (total 5 columns):' in info_string + with io.StringIO() as buf: + ray_df.info(buf=buf, verbose=False, memory_usage=False) + info_string = buf.getvalue() + assert 'memory usage: ' not in info_string + assert 'Columns: 5 entries, col1 to col5' in info_string @pytest.fixture From 8a69de5393f41b430f06b22cb7d1e13dbf2fc0fe Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 20:25:18 -0700 Subject: [PATCH 10/28] Updated info to do only one pass --- modin/pandas/dataframe.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index fce288f4883..b497de43148 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1962,12 +1962,22 @@ def info(self, # Create the Index info() string by parsing self.index index_string = index.summary() + '\n' - if actually_verbose: + # Package everything into one helper + def info_helper(df): + result = pandas.DataFrame() + if memory_usage: + result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - counts = self.count() + result['count'] = df.count() + return result + helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + + if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) + if null_counts: + counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: @@ -1987,7 +1997,7 @@ def info(self, # Create memory usage string memory_string = '' if memory_usage: - mem_data = self.memory_usage(deep=memory_usage_deep) + mem_data = helper_result['memory'] if memory_usage_deep: memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: From 0ea925f99c7e7cff5038d7d9b24a5ee426863804 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:08:51 -0700 Subject: [PATCH 11/28] Updated info to do everything in one run with DataFrame --- modin/pandas/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index b497de43148..9a257a32818 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1968,7 +1968,7 @@ def info_helper(df): if memory_usage: result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - result['count'] = df.count() + result['count'] = df.count(axis=0) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) From 8a7b320f36b8b75570a059c66ba1f0c549c99a42 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:12:42 -0700 Subject: [PATCH 12/28] Update info to do everything in one run with Series --- modin/pandas/dataframe.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 9a257a32818..7675c028e86 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1964,24 +1964,28 @@ def info(self, # Package everything into one helper def info_helper(df): - result = pandas.DataFrame() + result = pandas.Series() if memory_usage: - result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + memory = df.memory_usage(index=False, deep=memory_usage_deep) + memory = memory.add_suffix("_memory") + result = result.append(memory) if null_counts: - result['count'] = df.count(axis=0) + count = df.count(axis=0) + count = count.add_suffix("_count") + result = result.append(count) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + counts = helper_result.filter(regex='_count', axis=0) + mem_data = helper_result.filter(regex='_memory', axis=0) if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) - if null_counts: - counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: - col_string += '{0} not-null '.format(counts[col]) + col_string += '{0} not-null '.format(counts[col+"_count"]) col_string += '{0}\n'.format(dtype) else: # Create string for not verbose output @@ -1997,7 +2001,6 @@ def info_helper(df): # Create memory usage string memory_string = '' if memory_usage: - mem_data = helper_result['memory'] if memory_usage_deep: memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) else: From 8585f8f5f570ed84aafe6d758f7218b67a4d0a2f Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 22:08:51 -0700 Subject: [PATCH 13/28] Updated info to do everything in one run with DataFrame --- modin/pandas/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index b497de43148..9a257a32818 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1968,7 +1968,7 @@ def info_helper(df): if memory_usage: result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) if null_counts: - result['count'] = df.count() + result['count'] = df.count(axis=0) return result helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) From e27328811230db8a7523a2206de1716a47cf8c0c Mon Sep 17 00:00:00 2001 From: William Ma Date: Sat, 1 Sep 2018 23:34:13 -0700 Subject: [PATCH 14/28] Updated to get everything working and moved appropriate parts to DataManager --- modin/data_management/data_manager.py | 29 ++++++++++++++++ modin/pandas/dataframe.py | 48 +++++++++++++++++---------- modin/pandas/test/test_dataframe.py | 12 +++++-- 3 files changed, 69 insertions(+), 20 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 770399399f2..55dd28d5098 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -522,6 +522,27 @@ def idxmin_builder(df, **kwargs): # have to do a conversion. return self._post_process_idx_ops(axis, min_result) + def info(self, **kwargs): + def info_builder(df, **kwargs): + result = pandas.DataFrame() + if memory_usage: + result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + if null_counts: + result['count'] = df.count(axis=0) + return result + + memory_usage = kwargs.get('memory_usage', True) + null_counts = kwargs.get('null_counts', True) + + if type(memory_usage) == str and memory_usage == 'deep': + memory_usage_deep = True + else: + memory_usage_deep = False + + func = self._prepare_method(info_builder, **kwargs) + return self.full_axis_reduce(func, 0) + + def first_valid_index(self): # It may be possible to incrementally check each partition, but this @@ -556,6 +577,14 @@ def median(self, **kwargs): func = self._prepare_method(pandas.DataFrame.median, **kwargs) return self.full_axis_reduce(func, axis) + def memory_usage(self, **kwargs): + def memory_usage_builder(df, **kwargs): + return df.memory_usage(index=False, deep=deep) + + deep = kwargs.get('deep', False) + func = self._prepare_method(memory_usage_builder, **kwargs) + return self.full_axis_reduce(func, 0) + def nunique(self, **kwargs): axis = kwargs.get("axis", 0) func = self._prepare_method(pandas.DataFrame.nunique, **kwargs) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 9a257a32818..11fb03209c2 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1962,26 +1962,31 @@ def info(self, # Create the Index info() string by parsing self.index index_string = index.summary() + '\n' - # Package everything into one helper - def info_helper(df): - result = pandas.DataFrame() - if memory_usage: - result['memory'] = df.memory_usage(index=False, deep=memory_usage_deep) + if memory_usage or null_counts: + results_data = self._data_manager.info( + verbose=actually_verbose, + buf=buf, + max_cols=max_cols, + memory_usage=memory_usage, + null_counts=null_counts + ) if null_counts: - result['count'] = df.count(axis=0) - return result - helper_result = self._data_manager.full_reduce(axis=0, map_func=info_helper) + # For some reason, the counts table has a shape of (columns, columns) + counts = results_data['count'] + counts.columns = columns + if memory_usage: + # For some reason, the memory table has a shape of (columns, columns) + # but it doesn't matter because the cells not on the diagonal are NaN + memory_usage_data = results_data['memory'].sum() + index.memory_usage(deep=memory_usage_deep) if actually_verbose: # Create string for verbose output col_string = 'Data columns (total {0} columns):\n' \ .format(len(columns)) - if null_counts: - counts = helper_result['count'] for col, dtype in zip(columns, dtypes): col_string += '{0}\t'.format(col) if null_counts: - col_string += '{0} not-null '.format(counts[col]) + col_string += '{0} not-null '.format(counts.loc[col, col]) col_string += '{0}\n'.format(dtype) else: # Create string for not verbose output @@ -1997,11 +2002,10 @@ def info_helper(df): # Create memory usage string memory_string = '' if memory_usage: - mem_data = helper_result['memory'] if memory_usage_deep: - memory_string = 'memory usage: {0} bytes'.format(mem_data.sum()) + memory_string = 'memory usage: {0} bytes'.format(memory_usage_data) else: - memory_string = 'memory usage: {0}+ bytes'.format(mem_data.sum()) + memory_string = 'memory usage: {0}+ bytes'.format(memory_usage_data) # Combine all the components of the info() output result = ''.join([ @@ -2358,10 +2362,20 @@ def melt(self, "github.com/modin-project/modin.") def memory_usage(self, index=True, deep=False): - def remote_func(df): - return df.memory_usage(index=False, deep=deep) + """Returns the memory usage of each column in bytes - result = self._data_manager.full_reduce(axis=0, map_func=remote_func) + Args: + index (bool): Whether to include the memory usage of the DataFrame's + index in returned Series. Defaults to True + deep (bool): If True, introspect the data deeply by interrogating + objects dtypes for system-level memory consumption. Defaults to False + + Returns: + A Series where the index are the column names and the values are + the memory usage of each of the columns in bytes. If `index=true`, + then the first value of the Series will be 'Index' with its memory usage. + """ + result = self._data_manager.memory_usage(index=index, deep=deep) result.index = self.columns if index: diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 933d975e0da..8fa5ca258e8 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -2094,18 +2094,24 @@ def test_infer_objects(): #@pytest.fixture def test_info(): - ray_df = create_test_dataframe() + ray_df = pd.DataFrame({ + 'col1': [1, 2, 3, np.nan], + 'col2': [4, 5, np.nan, 7], + 'col3': [8, np.nan, 10, 11], + 'col4': [np.nan, 13, 14, 15] + }) + ray_df.info(memory_usage='deep') with io.StringIO() as buf: ray_df.info(buf=buf) info_string = buf.getvalue() assert '\n' in info_string assert 'memory usage: ' in info_string - assert 'Data columns (total 5 columns):' in info_string + assert 'Data columns (total 4 columns):' in info_string with io.StringIO() as buf: ray_df.info(buf=buf, verbose=False, memory_usage=False) info_string = buf.getvalue() assert 'memory usage: ' not in info_string - assert 'Columns: 5 entries, col1 to col5' in info_string + assert 'Columns: 4 entries, col1 to col4' in info_string @pytest.fixture From 5d52f7f22e2da5b3292e0e173b3951639ac72eb7 Mon Sep 17 00:00:00 2001 From: William Ma Date: Wed, 5 Sep 2018 22:24:57 -0700 Subject: [PATCH 15/28] Removed extraneous print statement --- modin/data_management/partitioning/partition_collections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/data_management/partitioning/partition_collections.py b/modin/data_management/partitioning/partition_collections.py index 1d43a9aeca5..a3dffe4a373 100644 --- a/modin/data_management/partitioning/partition_collections.py +++ b/modin/data_management/partitioning/partition_collections.py @@ -318,7 +318,7 @@ def to_pandas(self, is_transposed=False): return self.transpose().to_pandas(False).T else: retrieved_objects = [[obj.to_pandas() for obj in part] for part in self.partitions] - print(retrieved_objects[0]) + #print(retrieved_objects[0]) if all(isinstance(part, pandas.Series) for row in retrieved_objects for part in row): axis = 0 elif all(isinstance(part, pandas.DataFrame) for row in retrieved_objects for part in row): From 7571b3d5962ef95c30ad9c275f9dc2b8cf277182 Mon Sep 17 00:00:00 2001 From: William Ma Date: Wed, 5 Sep 2018 22:25:56 -0700 Subject: [PATCH 16/28] Moved dtypes stuff to data manager --- modin/data_management/data_manager.py | 83 +++++++++++++++------------ modin/pandas/dataframe.py | 56 ++++-------------- 2 files changed, 58 insertions(+), 81 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 55dd28d5098..8657cd21917 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -4,6 +4,7 @@ import numpy as np import pandas +from pandas.core.dtypes.common import _get_dtype_from_object from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions from .partitioning.remote_partition import RayRemotePartition @@ -14,11 +15,15 @@ class PandasDataManager(object): with a Pandas backend. This logic is specific to Pandas. """ - def __init__(self, block_partitions_object, index, columns): + def __init__(self, block_partitions_object, index, columns, dtypes): assert isinstance(block_partitions_object, BlockPartitions) self.data = block_partitions_object self.index = index self.columns = columns + if dtypes: + self.dtypes = dtypes + else: + self.dtypes = self.data.map_across_full_axis(0, lambda df: df.dtypes).to_pandas() # Index and columns objects # These objects are currently not distributed. @@ -51,7 +56,7 @@ def _set_columns(self, new_columns): columns = property(_get_columns, _set_columns) index = property(_get_index, _set_index) - # END Index and columns objects + # END Index, columns, and dtypes objects def compute_index(self, data_object): """Computes the index after a number of rows have been removed. @@ -89,12 +94,12 @@ def _prepare_method(self, pandas_func, **kwargs): def add_prefix(self, prefix): cls = type(self) new_column_names = self.columns.map(lambda x: str(prefix) + str(x)) - return cls(self.data, self.index, new_column_names) + return cls(self.data, self.index, new_column_names, self.dtypes) def add_suffix(self, suffix): cls = type(self) new_column_names = self.columns.map(lambda x: str(x) + str(suffix)) - return cls(self.data, self.index, new_column_names) + return cls(self.data, self.index, new_column_names, self.dtypes) # END Metadata modification methods # Copy @@ -103,7 +108,7 @@ def add_suffix(self, suffix): # to prevent that. def copy(self): cls = type(self) - return cls(self.data.copy(), self.index.copy(), self.columns.copy()) + return cls(self.data.copy(), self.index.copy(), self.columns.copy(), self.dtypes.copy()) # Append/Concat/Join (Not Merge) # The append/concat/join operations should ideally never trigger remote @@ -161,7 +166,7 @@ def _append_data_manager(self, other, ignore_index): new_data = new_self.concat(0, to_append) new_index = self.index.append(other.index) if not ignore_index else pandas.RangeIndex(len(self.index) + len(other.index)) - return cls(new_data, new_index, joined_columns) + return cls(new_data, new_index, joined_columns, None) def _append_list_of_managers(self, others, ignore_index): assert isinstance(others, list), \ @@ -178,7 +183,7 @@ def _append_list_of_managers(self, others, ignore_index): new_data = new_self.concat(0, to_append) new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others])) - return cls(new_data, new_index, joined_columns) + return cls(new_data, new_index, joined_columns, None) def _join_data_manager(self, other, **kwargs): assert isinstance(other, type(self)), \ @@ -204,7 +209,7 @@ def _join_data_manager(self, other, **kwargs): other_proxy = pandas.DataFrame(columns=other.columns) new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns - return cls(new_data, joined_index, new_columns) + return cls(new_data, joined_index, new_columns, None) def _join_list_of_managers(self, others, **kwargs): assert isinstance(others, list), \ @@ -238,7 +243,7 @@ def _join_list_of_managers(self, others, **kwargs): others_proxy = [pandas.DataFrame(columns=other.columns) for other in others] new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns - return cls(new_data, joined_index, new_columns) + return cls(new_data, joined_index, new_columns, None) # END Append/Concat/Join (Not Merge) # Inter-Data operations (e.g. add, sub) @@ -274,7 +279,7 @@ def inter_data_op_builder(left, right, self_cols, other_cols, func): new_data = reindexed_self.inter_data_operation(1, lambda l, r: inter_data_op_builder(l, r, self_cols, other_cols, func), reindexed_other) - return cls(new_data, joined_index, new_columns) + return cls(new_data, joined_index, new_columns, None) # END Inter-Data operations # Single Manager scalar operations (e.g. add to scalar, list of scalars) @@ -318,7 +323,7 @@ def reindex_builer(df, axis, old_labels, new_labels, **kwargs): # Additionally this operation is often followed by an operation that # assumes identical partitioning. Internally, we *may* change the # partitioning during a map across a full axis. - return cls(self.map_across_full_axis(axis, func), new_index, new_columns) + return cls(self.map_across_full_axis(axis, func), new_index, new_columns, None) def reset_index(self, **kwargs): cls = type(self) @@ -333,7 +338,7 @@ def reset_index(self, **kwargs): else: # The copies here are to ensure that we do not give references to # this object for the purposes of updates. - return cls(self.data.copy(), new_index, self.columns.copy()) + return cls(self.data.copy(), new_index, self.columns.copy(), self.dtypes.copy()) # END Reindex/reset_index # Transpose @@ -353,7 +358,7 @@ def transpose(self, *args, **kwargs): cls = type(self) new_data = self.data.transpose(*args, **kwargs) # Switch the index and columns and transpose the - new_manager = cls(new_data, self.columns, self.index) + new_manager = cls(new_data, self.columns, self.index, None) # It is possible that this is already transposed new_manager._is_transposed = self._is_transposed ^ 1 return new_manager @@ -422,7 +427,7 @@ def sum(self, **kwargs): # These operations are operations that apply a function to every partition. def map_partitions(self, func): cls = type(self) - return cls(self.data.map_across_blocks(func), self.index, self.columns) + return cls(self.data.map_across_blocks(func), self.index, self.columns, None) def abs(self): func = self._prepare_method(pandas.DataFrame.abs) @@ -643,7 +648,7 @@ def query_builder(df, **kwargs): # Query removes rows, so we need to update the index new_index = self.compute_index(new_data) - return cls(new_data, new_index, self.columns) + return cls(new_data, new_index, self.columns, self.dtypes) def eval(self, expr, **kwargs): cls = type(self) @@ -670,7 +675,7 @@ def eval_builder(df, **kwargs): else: columns = columns_copy.columns - return cls(new_data, self.index, columns) + return cls(new_data, self.index, columns, None) def quantile_for_list_of_values(self, **kwargs): cls = type(self) @@ -684,14 +689,14 @@ def quantile_for_list_of_values(self, **kwargs): new_data = self.map_across_full_axis(axis, func) new_columns = self.columns if not axis else self.index - return cls(new_data, q_index, new_columns) + return cls(new_data, q_index, new_columns, None) def _cumulative_builder(self, func, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) func = self._prepare_method(func, **kwargs) new_data = self.map_across_full_axis(axis, func) - return cls(new_data, self.index, self.columns) + return cls(new_data, self.index, self.columns, self.dtypes) def cumsum(self, **kwargs): return self._cumulative_builder(pandas.DataFrame.cumsum, **kwargs) @@ -765,7 +770,7 @@ def mode(self, **kwargs): # We build these intermediate objects to avoid depending directly on # the underlying implementation. final_data = cls(new_data, new_index, new_columns).map_across_full_axis(axis, lambda df: df.reindex(axis=axis, labels=final_labels)) - return cls(final_data, new_index, new_columns) + return cls(final_data, new_index, new_columns, self.dtypes) def fillna(self, **kwargs): cls = type(self) @@ -778,7 +783,7 @@ def fillna(self, **kwargs): else: func = self._prepare_method(pandas.DataFrame.fillna, **kwargs) new_data = self.map_across_full_axis(axis, func) - return cls(new_data, self.index, self.columns) + return cls(new_data, self.index, self.columns, None) def describe(self, **kwargs): cls = type(self) @@ -788,8 +793,9 @@ def describe(self, **kwargs): new_data = self.map_across_full_axis(axis, func) new_index = new_data.get_indices(axis=0) new_columns = self.columns[new_data.get_indices(axis=1, old_blocks=self.data)] + new_dtypes = pd.Series([np.float64 for _ in new_columns], index=new_columns) - return cls(new_data, new_index, new_columns) + return cls(new_data, new_index, new_columns, new_dtypes) def rank(self, **kwargs): cls = type(self) @@ -801,8 +807,9 @@ def rank(self, **kwargs): new_columns = self.columns[new_data.get_indices(axis=1, old_blocks=self.data)] else: new_columns = self.columns + new_dtypes = pd.Series([np.float64 for _ in new_columns], index=new_columns) - return cls(new_data, self.index, new_columns) + return cls(new_data, self.index, new_columns, new_dtypes) # END Map across rows/columns # Head/Tail/Front/Back @@ -816,40 +823,40 @@ def head(self, n): # ensure that we extract the correct data on each node. The index # on a transposed manager is already set to the correct value, so # we need to only take the head of that instead of re-transposing. - result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns) + result = cls(self.data.transpose().take(1, n).transpose(), self.index[:n], self.columns, self.dtypes) result._is_transposed = True else: - result = cls(self.data.take(0, n), self.index[:n], self.columns) + result = cls(self.data.take(0, n), self.index[:n], self.columns, self.dtypes) return result def tail(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns) + result = cls(self.data.transpose().take(1, -n).transpose(), self.index[-n:], self.columns, self.dtypes) result._is_transposed = True else: - result = cls(self.data.take(0, -n), self.index[-n:], self.columns) + result = cls(self.data.take(0, -n), self.index[-n:], self.columns, self.dtypes) return result def front(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(0, n).transpose(), self.index, self.columns[:n]) + result = cls(self.data.transpose().take(0, n).transpose(), self.index, self.columns[:n], self.dtypes[:n]) result._is_transposed = True else: - result = cls(self.data.take(1, n), self.index, self.columns[:n]) + result = cls(self.data.take(1, n), self.index, self.columns[:n], self.dtypes[:n]) return result def back(self, n): cls = type(self) # See head for an explanation of the transposed behavior if self._is_transposed: - result = cls(self.data.transpose().take(0, -n).transpose(), self.index, self.columns[-n:]) + result = cls(self.data.transpose().take(0, -n).transpose(), self.index, self.columns[-n:], self.dtypes[-n:]) result._is_transposed = True else: - result = cls(self.data.take(1, -n), self.index, self.columns[-n:]) + result = cls(self.data.take(1, -n), self.index, self.columns[-n:], self.dtypes[-n:]) return result # End Head/Tail/Front/Back @@ -872,13 +879,14 @@ def to_pandas(self): def from_pandas(cls, df, block_partitions_cls): new_index = df.index new_columns = df.columns + new_dtypes = df.dtypes # Set the columns to RangeIndex for memory efficiency df.index = pandas.RangeIndex(len(df.index)) df.columns = pandas.RangeIndex(len(df.columns)) new_data = block_partitions_cls.from_pandas(df) - return cls(new_data, new_index, new_columns) + return cls(new_data, new_index, new_columns, dtypes=new_dtypes) # __getitem__ methods def getitem_single_key(self, key): @@ -908,7 +916,8 @@ def getitem(df, internal_indices=[]): # We can't just set the columns to key here because there may be # multiple instances of a key. new_columns = self.columns[numeric_indices] - return cls(result, self.index, new_columns) + new_dtypes = self.dtypes[numeric_indices] + return cls(result, self.index, new_columns, new_dtypes) def getitem_row_array(self, key): cls = type(self) @@ -922,7 +931,7 @@ def getitem(df, internal_indices=[]): # We can't just set the index to key here because there may be multiple # instances of a key. new_index = self.index[numeric_indices] - return cls(result, new_index, self.columns) + return cls(result, new_index, self.columns, self.dtypes) # END __getitem__ methods # __delitem__ and drop @@ -957,7 +966,8 @@ def delitem(df, internal_indices=[]): # We can't use self.columns.drop with duplicate keys because in Pandas # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] - return cls(new_data, new_index, new_columns) + new_dtypes = [self.dtypes[i] for i in range(len(self.dtypes)) if i not in numeric_indices] + return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop # Insert @@ -975,7 +985,8 @@ def insert(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices_along_full_axis(0, insert, loc, keep_remaining=True) new_columns = self.columns.insert(loc, column) - return cls(new_data, self.index, new_columns) + new_dtypes = self.dtypes.insert(loc, _get_dtype_from_object(value)) + return cls(new_data, self.index, new_columns, new_dtypes) # END Insert # UDF (apply and agg) methods @@ -1022,7 +1033,7 @@ def callable_apply_builder(df, func, axis, index, *args, **kwargs): series_result.index = self.index return series_result - return cls(result_data, index, columns) + return cls(result_data, index, columns, None) class RayPandasDataManager(PandasDataManager): diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 11fb03209c2..4ef49b9404a 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -47,7 +47,6 @@ def __init__(self, block_partitions=None, row_metadata=None, col_metadata=None, - dtypes_cache=None, data_manager=None): """Distributed DataFrame object backed by Pandas dataframes. @@ -77,8 +76,6 @@ def __init__(self, self._data_manager = data._data_manager return - self._dtypes_cache = dtypes_cache - # Check type of data and use appropriate constructor if data is not None or (col_partitions is None and row_partitions is None @@ -92,9 +89,6 @@ def __init__(self, dtype=dtype, copy=copy) - # Cache dtypes - self._dtypes_cache = pandas_df.dtypes - self._data_manager = from_pandas(pandas_df)._data_manager else: if data_manager is not None: @@ -124,11 +118,6 @@ def __init__(self, partitions = col_partitions axis_length = len(index) if index is not None else \ len(row_metadata) - # All partitions will already have correct dtypes - self._dtypes_cache = [ - _deploy_func.remote(lambda df: df.dtypes, pandas_df) - for pandas_df in col_partitions - ] # TODO: write explicit tests for "short and wide" # column partitions @@ -136,9 +125,6 @@ def __init__(self, _create_block_partitions(partitions, axis=axis, length=axis_length) - if self._dtypes_cache is None and data_manager is None: - self._get_remote_dtypes() - if data_manager is None: self._data_manager = RayPandasDataManager._from_old_block_partitions(self._block_partitions, index, columns) @@ -341,14 +327,6 @@ def ftypes(self): result.index = self.columns return result - def _get_remote_dtypes(self): - """Finds and caches ObjectIDs for the dtypes of each column partition. - """ - self._dtypes_cache = [ - _compile_remote_dtypes.remote(*column) - for column in self._block_partitions.T - ] - @property def dtypes(self): """Get the dtypes for this DataFrame. @@ -356,16 +334,7 @@ def dtypes(self): Returns: The dtypes for this DataFrame. """ - assert self._dtypes_cache is not None - - if isinstance(self._dtypes_cache, list) and \ - isinstance(self._dtypes_cache[0], - ray.ObjectID): - self._dtypes_cache = pandas.concat( - ray.get(self._dtypes_cache), copy=False) - self._dtypes_cache.index = self.columns - - return self._dtypes_cache + return self._data_manager.dtypes @property def empty(self): @@ -413,7 +382,6 @@ def _update_inplace(self, new_manager): old_manager = self._data_manager self._data_manager = new_manager old_manager.free() - self._get_remote_dtypes() def add_prefix(self, prefix): """Add a prefix to each of the column names. @@ -526,7 +494,7 @@ def abs(self): Returns: A new DataFrame with the applied absolute value. """ - for t in self.dtypes: + for t in self._data_manager.dtypes: if np.dtype('O') == t: # TODO Give a more accurate error to Pandas raise TypeError("bad operand type for abs():", "str") @@ -1786,9 +1754,7 @@ def get_dtype_counts(self): Returns: The counts of dtypes in this object. """ - return ray.get( - _deploy_func.remote(lambda df: df.get_dtype_counts(), - self._row_partitions[0])) + return self.dtypes.value_counts() def get_ftype_counts(self): """Get the counts of ftypes in this object. @@ -1885,7 +1851,7 @@ def idxmin(self, axis=0, skipna=True): A Series with the index for each minimum value for the axis specified. """ - if not all(d != np.dtype('O') for d in self.dtypes): + if not all(d != np.dtype('O') for d in self._data_manager.dtypes): raise TypeError( "reduction operation 'argmax' not allowed for this dtype") @@ -1934,7 +1900,7 @@ def info(self, """ index = self._data_manager.index columns = self._data_manager.columns - dtypes = self.dtypes + dtypes = self._data_manager.dtypes # Set up default values verbose = True if verbose is None else verbose @@ -2870,20 +2836,20 @@ def check_bad_dtype(t): if not numeric_only: # check if there are any object columns - if all(check_bad_dtype(t) for t in self.dtypes): + if all(check_bad_dtype(t) for t in self._data_manager.dtypes): raise TypeError("can't multiply sequence by non-int of type " "'float'") else: - if next((True for t in self.dtypes if check_bad_dtype(t)), + if next((True for t in self._data_manager.dtypes if check_bad_dtype(t)), False): - dtype = next(t for t in self.dtypes if check_bad_dtype(t)) + dtype = next(t for t in self._data_manager.dtypes if check_bad_dtype(t)) raise ValueError("Cannot compare type '{}' with type '{}'" .format(type(dtype), float)) else: # Normally pandas returns this near the end of the quantile, but we # can't afford the overhead of running the entire operation before # we error. - if all(check_bad_dtype(t) for t in self.dtypes): + if all(check_bad_dtype(t) for t in self._data_manager.dtypes): raise ValueError("need at least one array to concatenate") # check that all qs are between 0 and 1 @@ -3395,7 +3361,7 @@ def is_dtype_instance_mapper(column, dtype): return column, functools.partial(issubclass, dtype.type) for column, f in itertools.starmap(is_dtype_instance_mapper, - self.dtypes.iteritems()): + self._data_manager.dtypes.iteritems()): if include: # checks for the case of empty include or exclude include_these[column] = any(map(f, include)) if exclude: @@ -4756,7 +4722,7 @@ def __neg__(self): Returns: A modified DataFrame where every element is the negation of before """ - for t in self.dtypes: + for t in self._data_manager.dtypes: if not (is_bool_dtype(t) or is_numeric_dtype(t) or is_timedelta64_dtype(t)): raise TypeError("Unary negative expects numeric dtype, not {}" From e70aaec7bea5a4c21679eff339c094420c39c02b Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 6 Sep 2018 16:19:45 -0700 Subject: [PATCH 17/28] Fixed calculating dtypes to only doing a full_reduce instead of map_full_axis --- modin/data_management/data_manager.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 8657cd21917..d95abe0d3e1 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -4,6 +4,7 @@ import numpy as np import pandas +from pandas.core.dtypes.cast import find_common_type from pandas.core.dtypes.common import _get_dtype_from_object from .partitioning.partition_collections import BlockPartitions, RayBlockPartitions @@ -23,7 +24,12 @@ def __init__(self, block_partitions_object, index, columns, dtypes): if dtypes: self.dtypes = dtypes else: - self.dtypes = self.data.map_across_full_axis(0, lambda df: df.dtypes).to_pandas() + map_func = lambda df: df.dtypes + def func(row): + return find_common_type(row.values) + reduce_func = lambda df: df.T.apply(func, axis=1) + self.dtypes = self.data.full_reduce(map_func, reduce_func, 0) + self.dtypes.index = self.columns # Index and columns objects # These objects are currently not distributed. From 59d6c412483a781bb0a150fd87e80593dcc14787 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 6 Sep 2018 21:10:19 -0700 Subject: [PATCH 18/28] Updated astype to new backend --- modin/data_management/data_manager.py | 40 +++++++++++++++++++++++-- modin/pandas/dataframe.py | 42 ++++++++------------------- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index d95abe0d3e1..55daadcbf63 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -21,7 +21,7 @@ def __init__(self, block_partitions_object, index, columns, dtypes): self.data = block_partitions_object self.index = index self.columns = columns - if dtypes: + if dtypes is not None: self.dtypes = dtypes else: map_func = lambda df: df.dtypes @@ -340,7 +340,7 @@ def reset_index(self, **kwargs): new_column_name = "index" if "index" not in self.columns else "level_0" new_columns = self.columns.insert(0, new_column_name) result = self.insert(0, new_column_name, self.index) - return cls(result.data, new_index, new_columns) + return cls(result.data, new_index, new_columns, None) else: # The copies here are to ensure that we do not give references to # this object for the purposes of updates. @@ -972,7 +972,9 @@ def delitem(df, internal_indices=[]): # We can't use self.columns.drop with duplicate keys because in Pandas # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] - new_dtypes = [self.dtypes[i] for i in range(len(self.dtypes)) if i not in numeric_indices] + dtypes = dtypes.values + new_dtypes = pd.Series([dtypes[i] for i in range(len(dtypes)) if i not in numeric_indices]) + new_dtypes.index = new_columns return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop @@ -995,6 +997,38 @@ def insert(df, internal_indices=[]): return cls(new_data, self.index, new_columns, new_dtypes) # END Insert + # astype + # This method changes the types of select columns to the new dtype. + def astype(self, col_dtypes, errors='raise', **kwargs): + cls = type(self) + + print(col_dtypes) + # Group the indicies to update together and create new dtypes series + dtype_indices = dict() + new_dtypes = self.dtypes + columns = col_dtypes.keys() + numeric_indices = list(self.columns.get_indexer_for(columns)) + for i, column in enumerate(columns): + if col_dtypes[column] not in col_dtypes.keys(): + dtype_indices[col_dtypes[column]] = [numeric_indices[i]] + else: + dtype_indices[col_dtypes[column]].append(numeric_indices[i]) + new_dtypes[column] = col_dtypes[column] + + new_data = self.data + for dtype in dtype_indices.keys(): + + def astype(df, internal_indices=[]): + block_dtypes = dict() + for ind in internal_indices: + block_dtypes[df.columns[ind]]= dtype + return df.astype(block_dtypes) + + new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) + + return cls(new_data, self.index, self.columns, new_dtypes) + # END astype + # UDF (apply and agg) methods # There is a wide range of behaviors that are supported, so a lot of the # logic can get a bit convoluted. diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 4ef49b9404a..1b07e2bb8ee 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1016,42 +1016,24 @@ def assign(self, **kwargs): "github.com/modin-project/modin.") def astype(self, dtype, copy=True, errors='raise', **kwargs): + col_dtypes = dict() if isinstance(dtype, dict): if (not set(dtype.keys()).issubset(set(self.columns)) and errors == 'raise'): raise KeyError("Only a column name can be used for the key in" "a dtype mappings argument.") - columns = list(dtype.keys()) - col_idx = [(self.columns.get_loc(columns[i]), columns[i]) if - columns[i] in self.columns else (columns[i], columns[i]) - for i in range(len(columns))] - new_dict = {} - for idx, key in col_idx: - new_dict[idx] = dtype[key] - new_rows = _map_partitions(lambda df, dt: df.astype(dtype=dt, - copy=True, - errors=errors, - **kwargs), - self._row_partitions, new_dict) - if copy: - return DataFrame( - row_partitions=new_rows, - columns=self.columns, - index=self.index) - self._row_partitions = new_rows + col_dtypes = dtype + + else: + for column in self._data_manager.columns: + col_dtypes[column] = dtype + + print(col_dtypes) + new_data_manager = self._data_manager.astype(col_dtypes, errors, **kwargs) + if copy: + return DataFrame(data_manager = new_data_manager) else: - new_blocks = [_map_partitions(lambda d: d.astype(dtype=dtype, - copy=True, - errors=errors, - **kwargs), - block) - for block in self._block_partitions] - if copy: - return DataFrame( - block_partitions=new_blocks, - columns=self.columns, - index=self.index) - self._block_partitions = new_blocks + self._update_inplace(new_data_manager) def at_time(self, time, asof=False): raise NotImplementedError( From f53917fa19007f1516da3949f3a1c00f3394007a Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 6 Sep 2018 22:07:35 -0700 Subject: [PATCH 19/28] Updated astype to new backend --- modin/data_management/data_manager.py | 18 ++++++++++++------ modin/pandas/dataframe.py | 3 +-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 55daadcbf63..3dbfd75d8e5 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -1002,21 +1002,27 @@ def insert(df, internal_indices=[]): def astype(self, col_dtypes, errors='raise', **kwargs): cls = type(self) - print(col_dtypes) # Group the indicies to update together and create new dtypes series dtype_indices = dict() - new_dtypes = self.dtypes columns = col_dtypes.keys() + new_dtypes = self.dtypes.copy() numeric_indices = list(self.columns.get_indexer_for(columns)) for i, column in enumerate(columns): - if col_dtypes[column] not in col_dtypes.keys(): - dtype_indices[col_dtypes[column]] = [numeric_indices[i]] + dtype = col_dtypes[column] + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) else: - dtype_indices[col_dtypes[column]].append(numeric_indices[i]) - new_dtypes[column] = col_dtypes[column] + dtype_indices[dtype] = [numeric_indices[i]] + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.int64 + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.float64 + new_dtypes[column] = new_dtype new_data = self.data for dtype in dtype_indices.keys(): + resulting_dtype = None def astype(df, internal_indices=[]): block_dtypes = dict() diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 1b07e2bb8ee..6f4b52d4b14 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1028,10 +1028,9 @@ def astype(self, dtype, copy=True, errors='raise', **kwargs): for column in self._data_manager.columns: col_dtypes[column] = dtype - print(col_dtypes) new_data_manager = self._data_manager.astype(col_dtypes, errors, **kwargs) if copy: - return DataFrame(data_manager = new_data_manager) + return DataFrame(data_manager=new_data_manager) else: self._update_inplace(new_data_manager) From e1b01fc4dd251642e36b4050e7568727faa605b6 Mon Sep 17 00:00:00 2001 From: William Ma Date: Thu, 6 Sep 2018 22:15:03 -0700 Subject: [PATCH 20/28] Updated ftypes to new backend --- modin/pandas/dataframe.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 6f4b52d4b14..eee0986d973 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -322,9 +322,10 @@ def ftypes(self): """ # The ftypes are common across all partitions. # The first partition will be enough. - result = ray.get( - _deploy_func.remote(lambda df: df.ftypes, self._row_partitions[0])) - result.index = self.columns + dtypes = self._data_manager.dtypes.copy() + ftypes = ["{0}:dense".format(str(dtype)) + for dtype in dtypes.values] + result = pandas.Series(ftypes, index=self.columns) return result @property From 3c28c8f4c7e860c137816205fb2488d05ffea964 Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 7 Sep 2018 09:50:34 -0700 Subject: [PATCH 21/28] Added dtypes argument to map_partitions --- modin/data_management/data_manager.py | 28 ++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 3dbfd75d8e5..4091ae6f252 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -431,13 +431,14 @@ def sum(self, **kwargs): # Map partitions operations # These operations are operations that apply a function to every partition. - def map_partitions(self, func): + def map_partitions(self, func, new_dtypes=None): cls = type(self) - return cls(self.data.map_across_blocks(func), self.index, self.columns, None) + return cls(self.data.map_across_blocks(func), self.index, self.columns, new_dtypes) def abs(self): func = self._prepare_method(pandas.DataFrame.abs) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('float64') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def applymap(self, func): remote_func = self._prepare_method(pandas.DataFrame.applymap, func=func) @@ -445,15 +446,18 @@ def applymap(self, func): def isin(self, **kwargs): func = self._prepare_method(pandas.DataFrame.isin, **kwargs) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def isna(self): func = self._prepare_method(pandas.DataFrame.isna) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def isnull(self): func = self._prepare_method(pandas.DataFrame.isnull) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def negative(self, **kwargs): func = self._prepare_method(pandas.DataFrame.__neg__, **kwargs) @@ -461,15 +465,17 @@ def negative(self, **kwargs): def notna(self): func = self._prepare_method(pandas.DataFrame.notna) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def notnull(self): func = self._prepare_method(pandas.DataFrame.notnull) - return self.map_partitions(func) + new_dtypes = pandas.Series([np.dtype('bool') for _ in self.columns], index=self.columns) + return self.map_partitions(func, new_dtypes=new_dtypes) def round(self, **kwargs): func = self._prepare_method(pandas.DataFrame.round, **kwargs) - return self.map_partitions(func) + return self.map_partitions(func, new_dtypes=self.dtypes.copy()) # END Map partitions operations # Column/Row partitions reduce operations @@ -1015,9 +1021,9 @@ def astype(self, col_dtypes, errors='raise', **kwargs): dtype_indices[dtype] = [numeric_indices[i]] new_dtype = np.dtype(dtype) if dtype != np.int32 and new_dtype == np.int32: - new_dtype = np.int64 + new_dtype = np.dtype('int64') elif dtype != np.float32 and new_dtype == np.float32: - new_dtype = np.float64 + new_dtype = np.dtype('float64') new_dtypes[column] = new_dtype new_data = self.data From c48c861e2272ebeb732efaa0e2c1a60e515d32b0 Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 7 Sep 2018 13:48:55 -0700 Subject: [PATCH 22/28] Updated astype and added dtypes option to _from_old_block_partitions in RayPandasDataManager --- modin/data_management/data_manager.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 27807e2359b..95ad2c560fe 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -1187,16 +1187,17 @@ def astype(self, col_dtypes, errors='raise', **kwargs): for i, column in enumerate(columns): dtype = col_dtypes[column] - if dtype in dtype_indices.keys(): - dtype_indices[dtype].append(numeric_indices[i]) - else: - dtype_indices[dtype] = [numeric_indices[i]] - new_dtype = np.dtype(dtype) - if dtype != np.int32 and new_dtype == np.int32: - new_dtype = np.dtype('int64') - elif dtype != np.float32 and new_dtype == np.float32: - new_dtype = np.dtype('float64') - new_dtypes[column] = new_dtype + if dtype != self.dtypes[column]: + if dtype in dtype_indices.keys(): + dtype_indices[dtype].append(numeric_indices[i]) + else: + dtype_indices[dtype] = [numeric_indices[i]] + new_dtype = np.dtype(dtype) + if dtype != np.int32 and new_dtype == np.int32: + new_dtype = np.dtype('int64') + elif dtype != np.float32 and new_dtype == np.float32: + new_dtype = np.dtype('float64') + new_dtypes[column] = new_dtype for dtype in dtype_indices.keys(): @@ -1328,7 +1329,7 @@ class RayPandasDataManager(PandasDataManager): @classmethod def _from_old_block_partitions(cls, blocks, index, columns): blocks = np.array([[RayRemotePartition(obj) for obj in row] for row in blocks]) - return PandasDataManager(RayBlockPartitions(blocks), index, columns) + return PandasDataManager(RayBlockPartitions(blocks), index, columns, None) def pandas_index_extraction(df, axis): From a47f8bed4a4f4c9223f72f0099d8e7aa31169942 Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 7 Sep 2018 14:11:45 -0700 Subject: [PATCH 23/28] Undid unnecessary change --- modin/data_management/data_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index c2c77a65973..76d585fe296 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -1352,7 +1352,7 @@ class RayPandasDataManager(PandasDataManager): @classmethod def _from_old_block_partitions(cls, blocks, index, columns): blocks = np.array([[RayRemotePartition(obj) for obj in row] for row in blocks]) - return PandasDataManager(RayBlockPartitions(blocks), index, columns, None) + return PandasDataManager(RayBlockPartitions(blocks), index, columns) def pandas_index_extraction(df, axis): From 9c3823191b9df4c099ad365e3ce29530083a3fad Mon Sep 17 00:00:00 2001 From: William Ma Date: Fri, 7 Sep 2018 17:12:43 -0700 Subject: [PATCH 24/28] Updated iterables to new backend --- modin/pandas/dataframe.py | 33 ++++++++++++--------------------- modin/pandas/iterator.py | 24 ++++++++++++++---------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 08eb384604f..047ba1c6037 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1912,17 +1912,14 @@ def iterrows(self): Returns: A generator that iterates over the rows of the frame. """ - index_iter = (self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))) + index_iter = iter(self.index) - def iterrow_helper(part): - df = ray.get(part) + def iterrow_builder(df): df.columns = self.columns - df.index = next(index_iter) + df.index = [next(index_iter)] return df.iterrows() - partition_iterator = PartitionIterator(self._row_partitions, - iterrow_helper) + partition_iterator = PartitionIterator(self._data_manager, 0, iterrow_builder) for v in partition_iterator: yield v @@ -1938,17 +1935,14 @@ def items(self): Returns: A generator that iterates over the columns of the frame. """ - col_iter = (self._col_metadata.partition_series(i).index - for i in range(len(self._col_partitions))) + col_iter = iter(self.columns) - def items_helper(part): - df = ray.get(part) - df.columns = next(col_iter) + def items_builder(df): + df.columns = [next(col_iter)] df.index = self.index return df.items() - partition_iterator = PartitionIterator(self._col_partitions, - items_helper) + partition_iterator = PartitionIterator(self._data_manager, 1, items_builder) for v in partition_iterator: yield v @@ -1980,17 +1974,14 @@ def itertuples(self, index=True, name='Pandas'): Returns: A tuple representing row data. See args for varying tuples. """ - index_iter = (self._row_metadata.partition_series(i).index - for i in range(len(self._row_partitions))) + index_iter = iter(self.index) - def itertuples_helper(part): - df = ray.get(part) + def itertuples_builder(df): df.columns = self.columns - df.index = next(index_iter) + df.index = [next(index_iter)] return df.itertuples(index=index, name=name) - partition_iterator = PartitionIterator(self._row_partitions, - itertuples_helper) + partition_iterator = PartitionIterator(self._data_manager, 0, itertuples_builder) for v in partition_iterator: yield v diff --git a/modin/pandas/iterator.py b/modin/pandas/iterator.py index 78a2af5e6b1..3743d605e05 100644 --- a/modin/pandas/iterator.py +++ b/modin/pandas/iterator.py @@ -6,17 +6,19 @@ class PartitionIterator(Iterator): - def __init__(self, partitions, func): + def __init__(self, data_manager, axis, func): """PartitionIterator class to define a generator on partitioned data Args: - partitions ([ObjectID]): Partitions to iterate over + data_manager (DataManager): Data manager for the dataframe + axis (int): axis to iterate over func (callable): The function to get inner iterables from each partition """ - self.partitions = iter(partitions) + self.data_manager = data_manager + self.axis = axis + self.index_iter = iter(self.data_manager.columns) if axis else iter(self.data_manager.index) self.func = func - self.iter_cache = iter([]) def __iter__(self): return self @@ -25,9 +27,11 @@ def __next__(self): return self.next() def next(self): - try: - return next(self.iter_cache) - except StopIteration: - next_partition = next(self.partitions) - self.iter_cache = self.func(next_partition) - return self.next() + if self.axis: + key = next(self.index_iter) + df = self.data_manager.getitem_column_array([key]).to_pandas() + else: + key = next(self.index_iter) + df = self.data_manager.getitem_row_array([key]).to_pandas() + return next(self.func(df)) + From c8b6301cfca8481ef9748516ab16b443c09ae756 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sun, 9 Sep 2018 17:44:26 -0700 Subject: [PATCH 25/28] Updated to_datetime to new backend --- modin/data_management/data_manager.py | 39 ++++++---------------- modin/pandas/datetimes.py | 48 +++++++++------------------ 2 files changed, 25 insertions(+), 62 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 76d585fe296..7b8c37b2a6a 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -759,6 +759,14 @@ def std(self, **kwargs): func = self._prepare_method(pandas.DataFrame.std, **kwargs) return self.full_axis_reduce(func, axis) + def to_datetime(self, **kwargs): + columns = self.columns + def to_datetime_builder(df, **kwargs): + df.columns = columns + return pandas.to_datetime(df, **kwargs) + func = self._prepare_method(to_datetime_builder, **kwargs) + return self.full_axis_reduce(func, 1) + def var(self, **kwargs): # Pandas default is 0 (though not mentioned in docs) axis = kwargs.get("axis", 0) @@ -805,33 +813,6 @@ def query_builder(df, **kwargs): return cls(new_data, new_index, self.columns, self.dtypes) - def eval(self, expr, **kwargs): - cls = type(self) - columns = self.columns - - def eval_builder(df, **kwargs): - df.columns = columns - result = df.eval(expr, inplace=False, **kwargs) - # If result is a series, expr was not an assignment expression. - if not isinstance(result, pandas.Series): - result.columns = pandas.RangeIndex(0, len(result.columns)) - return result - - func = self._prepare_method(eval_builder, **kwargs) - new_data = self.map_across_full_axis(1, func) - - # eval can update the columns, so we must update columns - columns_copy = pandas.DataFrame(columns=columns) - columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) - if isinstance(columns_copy, pandas.Series): - # To create a data manager, we need the - # columns to be in a list-like - columns = list(columns_copy.name) - else: - columns = columns_copy.columns - - return cls(new_data, self.index, columns, None) - def eval(self, expr, **kwargs): cls = type(self) columns = self.columns @@ -1198,7 +1179,7 @@ def insert(df, internal_indices=[]): # END Insert # astype - # This method changes the types of select columns to the new dtype. + # This method change the dtypes of column(s) def astype(self, col_dtypes, errors='raise', **kwargs): cls = type(self) @@ -1234,7 +1215,7 @@ def astype(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices(0, astype, dtype_indices[dtype], keep_remaining=True) return cls(new_data, self.index, self.columns, new_dtypes) - # END astype + # END type conversions # UDF (apply and agg) methods # There is a wide range of behaviors that are supported, so a lot of the diff --git a/modin/pandas/datetimes.py b/modin/pandas/datetimes.py index 77a03de607c..a3e293b2fbc 100644 --- a/modin/pandas/datetimes.py +++ b/modin/pandas/datetimes.py @@ -56,38 +56,20 @@ def to_datetime(arg, unit=unit, infer_datetime_format=infer_datetime_format, origin=origin) - if errors == 'raise': - pandas.to_datetime( - pandas.DataFrame(columns=arg.columns), - errors=errors, - dayfirst=dayfirst, - yearfirst=yearfirst, - utc=utc, - box=box, - format=format, - exact=exact, - unit=unit, - infer_datetime_format=infer_datetime_format, - origin=origin) - - def datetime_helper(df, cols): - df.columns = cols - return pandas.to_datetime( - df, - errors=errors, - dayfirst=dayfirst, - yearfirst=yearfirst, - utc=utc, - box=box, - format=format, - exact=exact, - unit=unit, - infer_datetime_format=infer_datetime_format, - origin=origin) - datetime_series = _map_partitions(datetime_helper, arg._row_partitions, - arg.columns) - result = pandas.concat(ray.get(datetime_series), copy=False) - result.index = arg.index + # Pandas seems to ignore this kwarg so we will too + #if errors == 'raise': + pandas.to_datetime( + pandas.DataFrame(columns=arg.columns), + errors=errors, + dayfirst=dayfirst, + yearfirst=yearfirst, + utc=utc, + box=box, + format=format, + exact=exact, + unit=unit, + infer_datetime_format=infer_datetime_format, + origin=origin) - return result + return arg._data_manager.to_datetime() From a9af405888858e57aaa3ebd5739eaedd0edbcfb4 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sun, 9 Sep 2018 18:40:10 -0700 Subject: [PATCH 26/28] Reverted some changes for PR --- modin/data_management/data_manager.py | 36 +++++++++++++-------------- modin/pandas/dataframe.py | 26 +++++++++---------- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 7b8c37b2a6a..afe7c4dc9cc 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -194,7 +194,7 @@ def _append_data_manager(self, other, ignore_index): new_data = new_self.concat(0, to_append) new_index = self.index.append(other.index) if not ignore_index else pandas.RangeIndex(len(self.index) + len(other.index)) - return cls(new_data, new_index, joined_columns, None) + return cls(new_data, new_index, joined_columns) def _append_list_of_managers(self, others, ignore_index): assert isinstance(others, list), \ @@ -211,7 +211,7 @@ def _append_list_of_managers(self, others, ignore_index): new_data = new_self.concat(0, to_append) new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others])) - return cls(new_data, new_index, joined_columns, None) + return cls(new_data, new_index, joined_columns) def _join_data_manager(self, other, **kwargs): assert isinstance(other, type(self)), \ @@ -237,7 +237,7 @@ def _join_data_manager(self, other, **kwargs): other_proxy = pandas.DataFrame(columns=other.columns) new_columns = self_proxy.join(other_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns - return cls(new_data, joined_index, new_columns, None) + return cls(new_data, joined_index, new_columns) def _join_list_of_managers(self, others, **kwargs): assert isinstance(others, list), \ @@ -271,8 +271,8 @@ def _join_list_of_managers(self, others, **kwargs): others_proxy = [pandas.DataFrame(columns=other.columns) for other in others] new_columns = self_proxy.join(others_proxy, lsuffix=lsuffix, rsuffix=rsuffix).columns - return cls(new_data, joined_index, new_columns, None) - # END Append/Concat/Join (Not Merge) + return cls(new_data, joined_index, new_columns) + # END Append/Concat/Join # Inter-Data operations (e.g. add, sub) # These operations require two DataFrames and will change the shape of the @@ -307,7 +307,7 @@ def inter_data_op_builder(left, right, self_cols, other_cols, func): new_data = reindexed_self.inter_data_operation(1, lambda l, r: inter_data_op_builder(l, r, self_cols, other_cols, func), reindexed_other) - return cls(new_data, joined_index, new_columns, None) + return cls(new_data, joined_index, new_columns) def _inter_df_op_handler(self, func, other, **kwargs): """Helper method for inter-DataFrame and scalar operations""" @@ -355,7 +355,7 @@ def where_builder_series(df, cond, other, **kwargs): reindexed_cond = cond.reindex(axis, self.index if not axis else self.columns).data new_data = reindexed_self.inter_data_operation(axis, lambda l, r: where_builder_series(l, r, other, **kwargs), reindexed_cond) - return cls(new_data, self.index, self.columns, None) + return cls(new_data, self.index, self.columns) def update(self, other, **kwargs): assert isinstance(other, type(self)), \ @@ -481,7 +481,7 @@ def reset_index(self, **kwargs): new_column_name = "index" if "index" not in self.columns else "level_0" new_columns = self.columns.insert(0, new_column_name) result = self.insert(0, new_column_name, self.index) - return cls(result.data, new_index, new_columns, None) + return cls(result.data, new_index, new_columns) else: # The copies here are to ensure that we do not give references to # this object for the purposes of updates. @@ -505,7 +505,7 @@ def transpose(self, *args, **kwargs): cls = type(self) new_data = self.data.transpose(*args, **kwargs) # Switch the index and columns and transpose the - new_manager = cls(new_data, self.columns, self.index, None) + new_manager = cls(new_data, self.columns, self.index) # It is possible that this is already transposed new_manager._is_transposed = self._is_transposed ^ 1 return new_manager @@ -852,7 +852,7 @@ def quantile_for_list_of_values(self, **kwargs): new_data = self.map_across_full_axis(axis, func) new_columns = self.columns if not axis else self.index - return cls(new_data, q_index, new_columns, None) + return cls(new_data, q_index, new_columns) def _cumulative_builder(self, func, **kwargs): cls = type(self) @@ -875,8 +875,8 @@ def cumprod(self, **kwargs): def dropna(self, **kwargs): axis = kwargs.get("axis", 0) - subset = kwargs.get("subset", None) - thresh = kwargs.get("thresh", None) + subset = kwargs.get("subset") + thresh = kwargs.get("thresh") how = kwargs.get("how", "any") # We need to subset the axis that we care about with `subset`. This # will be used to determine the number of values that are NA. @@ -939,7 +939,7 @@ def fillna(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) - value = kwargs.pop("value", None) + value = kwargs.pop("value") if isinstance(value, dict): if axis == 0: @@ -956,7 +956,7 @@ def fillna_dict_builder(df, func_dict={}): else: func = self._prepare_method(pandas.DataFrame.fillna, **kwargs) new_data = self.map_across_full_axis(axis, func) - return cls(new_data, self.index, self.columns, None) + return cls(new_data, self.index, self.columns) def describe(self, **kwargs): cls = type(self) @@ -984,7 +984,7 @@ def rank(self, **kwargs): new_columns = self.compute_index(1, new_data, True) else: new_columns = self.columns - new_dtypes = pd.Series([np.float64 for _ in new_columns], index=new_columns) + new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) return cls(new_data, self.index, new_columns, new_dtypes) def diff(self, **kwargs): @@ -995,7 +995,7 @@ def diff(self, **kwargs): func = self._prepare_method(pandas.DataFrame.diff, **kwargs) new_data = self.map_across_full_axis(axis, func) - return cls(new_data, self.index, self.columns, None) + return cls(new_data, self.index, self.columns) # END Map across rows/columns # Head/Tail/Front/Back @@ -1179,7 +1179,7 @@ def insert(df, internal_indices=[]): # END Insert # astype - # This method change the dtypes of column(s) + # This method changes the types of select columns to the new dtype. def astype(self, col_dtypes, errors='raise', **kwargs): cls = type(self) @@ -1254,7 +1254,7 @@ def _post_process_apply(self, result_data, axis): series_result.index = index return series_result - return cls(result_data, index, columns, None) + return cls(result_data, index, columns) def _dict_func(self, func, axis, *args, **kwargs): if "axis" not in kwargs: diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 047ba1c6037..ddc8147d460 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -495,7 +495,7 @@ def abs(self): Returns: A new DataFrame with the applied absolute value. """ - for t in self._data_manager.dtypes: + for t in self.dtypes: if np.dtype('O') == t: # TODO Give a more accurate error to Pandas raise TypeError("bad operand type for abs():", "str") @@ -1732,7 +1732,7 @@ def idxmin(self, axis=0, skipna=True): A Series with the index for each minimum value for the axis specified. """ - if not all(d != np.dtype('O') for d in self._data_manager.dtypes): + if not all(d != np.dtype('O') for d in self.dtypes): raise TypeError( "reduction operation 'argmax' not allowed for this dtype") @@ -1779,9 +1779,9 @@ def info(self, Returns: Prints the summary of a DataFrame and returns None. """ - index = self._data_manager.index - columns = self._data_manager.columns - dtypes = self._data_manager.dtypes + index = self.index + columns = self.columns + dtypes = self.dtypes # Set up default values verbose = True if verbose is None else verbose @@ -2646,20 +2646,20 @@ def check_bad_dtype(t): if not numeric_only: # check if there are any object columns - if all(check_bad_dtype(t) for t in self._data_manager.dtypes): + if all(check_bad_dtype(t) for t in self.dtypes): raise TypeError("can't multiply sequence by non-int of type " "'float'") else: - if next((True for t in self._data_manager.dtypes if check_bad_dtype(t)), + if next((True for t in self.dtypes if check_bad_dtype(t)), False): - dtype = next(t for t in self._data_manager.dtypes if check_bad_dtype(t)) + dtype = next(t for t in self.dtypes if check_bad_dtype(t)) raise ValueError("Cannot compare type '{}' with type '{}'" .format(type(dtype), float)) else: # Normally pandas returns this near the end of the quantile, but we # can't afford the overhead of running the entire operation before # we error. - if all(check_bad_dtype(t) for t in self._data_manager.dtypes): + if all(check_bad_dtype(t) for t in self.dtypes): raise ValueError("need at least one array to concatenate") # check that all qs are between 0 and 1 @@ -3028,10 +3028,10 @@ def sample(self, else 0 if axis == 0: - axis_labels = self._data_manager.index + axis_labels = self.index axis_length = len(axis_labels) else: - axis_labels = self._data_manager.column + axis_labels = self.column axis_length = len(axis_labels) if weights is not None: @@ -3171,7 +3171,7 @@ def is_dtype_instance_mapper(column, dtype): return column, functools.partial(issubclass, dtype.type) for column, f in itertools.starmap(is_dtype_instance_mapper, - self._data_manager.dtypes.iteritems()): + self.dtypes.iteritems()): if include: # checks for the case of empty include or exclude include_these[column] = any(map(f, include)) if exclude: @@ -4438,7 +4438,7 @@ def __neg__(self): Returns: A modified DataFrame where every element is the negation of before """ - for t in self._data_manager.dtypes: + for t in self.dtypes: if not (is_bool_dtype(t) or is_numeric_dtype(t) or is_timedelta64_dtype(t)): raise TypeError("Unary negative expects numeric dtype, not {}" From 7e3c03d0884723c2194a1880ac0db5bd31ef9837 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sun, 9 Sep 2018 19:34:52 -0700 Subject: [PATCH 27/28] Replaced pd with pandas --- modin/data_management/data_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index afe7c4dc9cc..1ff4638b928 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -966,7 +966,7 @@ def describe(self, **kwargs): new_data = self.map_across_full_axis(axis, func) new_index = self.compute_index(0, new_data, False) new_columns = self.compute_index(1, new_data, True) - new_dtypes = pd.Series([np.float64 for _ in new_columns], index=new_columns) + new_dtypes = pandas.Series([np.float64 for _ in new_columns], index=new_columns) return cls(new_data, new_index, new_columns, new_dtypes) @@ -1154,7 +1154,7 @@ def delitem(df, internal_indices=[]): # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] dtypes = dtypes.values - new_dtypes = pd.Series([dtypes[i] for i in range(len(dtypes)) if i not in numeric_indices]) + new_dtypes = pandas.Series([dtypes[i] for i in range(len(dtypes)) if i not in numeric_indices]) new_dtypes.index = new_columns return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop From ebea57718220e3c494dd9f5450814957091bb562 Mon Sep 17 00:00:00 2001 From: William Ma Date: Sun, 9 Sep 2018 22:39:04 -0700 Subject: [PATCH 28/28] Made additional changes mentioned in (#7) --- modin/data_management/data_manager.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 1ff4638b928..189fc60596b 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -1153,9 +1153,7 @@ def delitem(df, internal_indices=[]): # We can't use self.columns.drop with duplicate keys because in Pandas # it throws an error. new_columns = [self.columns[i] for i in range(len(self.columns)) if i not in numeric_indices] - dtypes = dtypes.values - new_dtypes = pandas.Series([dtypes[i] for i in range(len(dtypes)) if i not in numeric_indices]) - new_dtypes.index = new_columns + new_dtypes = self.dtypes.drop(columns) return cls(new_data, new_index, new_columns, new_dtypes) # END __delitem__ and drop @@ -1174,7 +1172,13 @@ def insert(df, internal_indices=[]): new_data = self.data.apply_func_to_select_indices_along_full_axis(0, insert, loc, keep_remaining=True) new_columns = self.columns.insert(loc, column) - new_dtypes = self.dtypes.insert(loc, _get_dtype_from_object(value)) + + # Because a Pandas Series does not allow insert, we make a DataFrame + # and insert the new dtype that way. + temp_dtypes = pandas.DataFrame(self.dtypes).T + temp_dtypes.insert(loc, column, _get_dtype_from_object(value)) + new_dtypes = temp_dtypes.iloc[0] + return cls(new_data, self.index, new_columns, new_dtypes) # END Insert @@ -1187,7 +1191,9 @@ def astype(self, col_dtypes, errors='raise', **kwargs): dtype_indices = dict() columns = col_dtypes.keys() new_dtypes = self.dtypes.copy() + numeric_indices = list(self.columns.get_indexer_for(columns)) + for i, column in enumerate(columns): dtype = col_dtypes[column] if dtype != self.dtypes[column]: