diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index e1c8ce65e03..96ae8bc1659 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -169,49 +169,44 @@ def _join_index_objects(self, axis, other_index, how, sort=True): else: return self.index.join(other_index, how=how, sort=sort) - def concat(self, axis, other, **kwargs): - ignore_index = kwargs.get("ignore_index", False) - if axis == 0: - if isinstance(other, list): - return self._append_list_of_managers(other, ignore_index) - else: - return self._append_data_manager(other, ignore_index) + def join(self, other, **kwargs): + if isinstance(other, list): + return self._join_list_of_managers(other, **kwargs) else: - if isinstance(other, list): - return self._join_list_of_managers(other, **kwargs) - else: - return self._join_data_manager(other, **kwargs) - - def _append_data_manager(self, other, ignore_index): - assert isinstance(other, type(self)), \ - "This method is for data manager objects only" - cls = type(self) + return self._join_data_manager(other, **kwargs) - joined_columns = self._join_index_objects(0, other.columns, 'outer') - to_append = other.reindex(1, joined_columns).data - new_self = self.reindex(1, joined_columns).data - - new_data = new_self.concat(0, to_append) - new_index = self.index.append(other.index) if not ignore_index else pandas.RangeIndex(len(self.index) + len(other.index)) - - return cls(new_data, new_index, joined_columns) + def concat(self, axis, other, **kwargs): + return self._append_list_of_managers(other, axis, **kwargs) - def _append_list_of_managers(self, others, ignore_index): + def _append_list_of_managers(self, others, axis, **kwargs): assert isinstance(others, list), \ "This method is for lists of DataManager objects only" assert all(isinstance(other, type(self)) for other in others), \ "Different Manager objects are being used. This is not allowed" cls = type(self) - joined_columns = self._join_index_objects(0, [other.columns for other in others], 'outer') + sort = kwargs.get("sort", None) + join = kwargs.get("join", "outer") + ignore_index = kwargs.get("ignore_index", False) + + joined_axis = self._join_index_objects(axis, [other.columns if axis == 0 + else other.index for other in others], join, sort=sort) + + to_append = [other.reindex(axis ^ 1, joined_axis).data for other in others] + new_self = self.reindex(axis ^ 1, joined_axis).data + new_data = new_self.concat(axis, to_append) - to_append = [other.reindex(1, joined_columns).data for other in others] - new_self = self.reindex(1, joined_columns).data + if axis == 0: + new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others])) + + return cls(new_data, new_index, joined_axis) + else: + self_proxy_columns = pandas.DataFrame(columns=self.columns).columns + others_proxy_columns = [pandas.DataFrame(columns=other.columns).columns for other in others] + new_columns = self_proxy_columns.append(others_proxy_columns) - new_data = new_self.concat(0, to_append) - new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others])) + return cls(new_data, joined_axis, new_columns) - return cls(new_data, new_index, joined_columns) def _join_data_manager(self, other, **kwargs): assert isinstance(other, type(self)), \ @@ -252,12 +247,6 @@ def _join_list_of_managers(self, others, **kwargs): lsuffix = kwargs.get("lsuffix", "") rsuffix = kwargs.get("rsuffix", "") - assert isinstance(others, list), \ - "This method is for lists of DataManager objects only" - assert all(isinstance(other, type(self)) for other in others), \ - "Different Manager objects are being used. This is not allowed" - cls = type(self) - joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort) to_join = [other.reindex(0, joined_index).data for other in others] @@ -1069,9 +1058,6 @@ def from_pandas(cls, df, block_partitions_cls): new_columns = df.columns new_dtypes = df.dtypes - # Set the columns to RangeIndex for memory efficiency - df.index = pandas.RangeIndex(len(df.index)) - df.columns = pandas.RangeIndex(len(df.columns)) new_data = block_partitions_cls.from_pandas(df) return cls(new_data, new_index, new_columns, dtypes=new_dtypes) diff --git a/modin/pandas/concat.py b/modin/pandas/concat.py index c4bb2945cb5..dadeda63fce 100644 --- a/modin/pandas/concat.py +++ b/modin/pandas/concat.py @@ -64,89 +64,20 @@ def concat(objs, raise ValueError("Only can inner (intersect) or outer (union) join the" " other axis") - # We need this in a list because we use it later. - all_index, all_columns = list( - zip(*[(obj.index, obj.columns) for obj in objs])) - - def series_to_df(series, columns): - df = pandas.DataFrame(series) - df.columns = columns - return DataFrame(df) - - # Pandas puts all of the Series in a single column named 0. This is - # true regardless of the existence of another column named 0 in the - # concat. - if axis == 0: - objs = [ - series_to_df(obj, [0]) if isinstance(obj, pandas.Series) else obj - for obj in objs - ] - else: - # Pandas starts the count at 0 so this will increment the names as - # long as there's a new nameless Series being added. - def name_incrementer(i): - val = i[0] - i[0] += 1 - return val - - i = [0] - objs = [ - series_to_df( - obj, obj.name if obj.name is not None else name_incrementer(i)) - if isinstance(obj, pandas.Series) else obj for obj in objs - ] - - # Using concat on the columns and index is fast because they're empty, - # and it forces the error checking. It also puts the columns in the - # correct order for us. - final_index = \ - pandas.concat([pandas.DataFrame(index=idx) for idx in all_index], - axis=axis, join=join, join_axes=join_axes, - ignore_index=ignore_index, keys=keys, levels=levels, - names=names, verify_integrity=verify_integrity, - copy=False).index - final_columns = \ - pandas.concat([pandas.DataFrame(columns=col) - for col in all_columns], - axis=axis, join=join, join_axes=join_axes, - ignore_index=ignore_index, keys=keys, levels=levels, - names=names, verify_integrity=verify_integrity, - copy=False).columns - - # Put all of the DataFrames into Ray format - # TODO just partition the DataFrames instead of building a new Ray DF. - objs = [ - DataFrame(obj) - if isinstance(obj, (pandas.DataFrame, pandas.Series)) else obj - for obj in objs - ] - - # Here we reuse all_columns/index so we don't have to materialize objects - # from remote memory built in the previous line. In the future, we won't be - # building new DataFrames, rather just partitioning the DataFrames. - if axis == 0: - new_blocks = np.array([ - _reindex_helper._submit( - args=tuple([ - all_columns[i], final_columns, axis, - len(objs[0]._block_partitions) - ] + part.tolist()), - num_return_vals=len(objs[0]._block_partitions)) - for i in range(len(objs)) for part in objs[i]._block_partitions - ]) - else: - # Transposing the columns is necessary because the remote task treats - # everything like rows and returns in row-major format. Luckily, this - # operation is cheap in numpy. - new_blocks = np.array([ - _reindex_helper._submit( - args=tuple([ - all_index[i], final_index, axis, - len(objs[0]._block_partitions.T) - ] + part.tolist()), - num_return_vals=len(objs[0]._block_partitions.T)) - for i in range(len(objs)) for part in objs[i]._block_partitions.T - ]).T - - return DataFrame( - block_partitions=new_blocks, columns=final_columns, index=final_index) + # We have the weird Series and axis check because, when concatenating a + # dataframe to a series on axis=0, pandas ignores the name of the series, + # and this check aims to mirror that (possibly buggy) functionality + objs = [obj if isinstance(obj, DataFrame) else DataFrame(obj.rename()) if + isinstance(obj, pandas.Series) and axis == 0 else DataFrame(obj) + for obj in objs] + df = objs[0] + objs = [obj._data_manager for obj in objs] + new_manager = df._data_manager.concat(axis, objs[1:], join=join, + join_axes=None, + ignore_index=False, + keys=None, + levels=None, + names=None, + verify_integrity=False, + copy=True) + return DataFrame(data_manager=new_manager) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index ff97c95bb98..1d88062ae73 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -2011,7 +2011,7 @@ def join(self, # would otherwise require a lot more logic. pandas.DataFrame(columns=self.columns).join(pandas.DataFrame(columns=other.columns), lsuffix=lsuffix, rsuffix=rsuffix).columns - return DataFrame(data_manager=self._data_manager.concat(1, other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort)) + return DataFrame(data_manager=self._data_manager.join(other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort)) else: # This constraint carried over from Pandas. if on is not None: @@ -2024,7 +2024,7 @@ def join(self, lsuffix=lsuffix, rsuffix=rsuffix).columns - return DataFrame(data_manager=self._data_manager.concat(1, [obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort)) + return DataFrame(data_manager=self._data_manager.join([obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort)) def kurt(self, axis=None, diff --git a/modin/pandas/test/test_concat.py b/modin/pandas/test/test_concat.py index 1f64e3162a3..dab13e8877d 100644 --- a/modin/pandas/test/test_concat.py +++ b/modin/pandas/test/test_concat.py @@ -68,6 +68,18 @@ def test_ray_concat(): assert ray_df_equals_pandas( pd.concat([ray_df, ray_df2]), pandas.concat([df, df2])) +def test_ray_concat_with_series(): + df, df2 = generate_dfs() + ray_df, ray_df2 = from_pandas(df), from_pandas(df2) + pandas_series = pandas.Series([1,2,3,4], name="new_col") + + assert ray_df_equals_pandas( + pd.concat([ray_df, ray_df2, pandas_series], axis=0), pandas.concat([df, + df2, pandas_series], axis=0)) + + assert ray_df_equals_pandas( + pd.concat([ray_df, ray_df2, pandas_series], axis=1), pandas.concat([df, + df2, pandas_series], axis=1)) def test_ray_concat_on_index(): df, df2 = generate_dfs()