Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 26 additions & 40 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,49 +169,44 @@ def _join_index_objects(self, axis, other_index, how, sort=True):
else:
return self.index.join(other_index, how=how, sort=sort)

def concat(self, axis, other, **kwargs):
ignore_index = kwargs.get("ignore_index", False)
if axis == 0:
if isinstance(other, list):
return self._append_list_of_managers(other, ignore_index)
else:
return self._append_data_manager(other, ignore_index)
def join(self, other, **kwargs):
if isinstance(other, list):
return self._join_list_of_managers(other, **kwargs)
else:
if isinstance(other, list):
return self._join_list_of_managers(other, **kwargs)
else:
return self._join_data_manager(other, **kwargs)

def _append_data_manager(self, other, ignore_index):
assert isinstance(other, type(self)), \
"This method is for data manager objects only"
cls = type(self)
return self._join_data_manager(other, **kwargs)

joined_columns = self._join_index_objects(0, other.columns, 'outer')
to_append = other.reindex(1, joined_columns).data
new_self = self.reindex(1, joined_columns).data

new_data = new_self.concat(0, to_append)
new_index = self.index.append(other.index) if not ignore_index else pandas.RangeIndex(len(self.index) + len(other.index))

return cls(new_data, new_index, joined_columns)
def concat(self, axis, other, **kwargs):
return self._append_list_of_managers(other, axis, **kwargs)

def _append_list_of_managers(self, others, ignore_index):
def _append_list_of_managers(self, others, axis, **kwargs):
assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

joined_columns = self._join_index_objects(0, [other.columns for other in others], 'outer')
sort = kwargs.get("sort", None)
join = kwargs.get("join", "outer")
ignore_index = kwargs.get("ignore_index", False)

joined_axis = self._join_index_objects(axis, [other.columns if axis == 0
else other.index for other in others], join, sort=sort)

to_append = [other.reindex(axis ^ 1, joined_axis).data for other in others]
new_self = self.reindex(axis ^ 1, joined_axis).data
new_data = new_self.concat(axis, to_append)

to_append = [other.reindex(1, joined_columns).data for other in others]
new_self = self.reindex(1, joined_columns).data
if axis == 0:
new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others]))

return cls(new_data, new_index, joined_axis)
else:
self_proxy_columns = pandas.DataFrame(columns=self.columns).columns
others_proxy_columns = [pandas.DataFrame(columns=other.columns).columns for other in others]
new_columns = self_proxy_columns.append(others_proxy_columns)

new_data = new_self.concat(0, to_append)
new_index = self.index.append([other.index for other in others]) if not ignore_index else pandas.RangeIndex(len(self.index) + sum([len(other.index) for other in others]))
return cls(new_data, joined_axis, new_columns)

return cls(new_data, new_index, joined_columns)

def _join_data_manager(self, other, **kwargs):
assert isinstance(other, type(self)), \
Expand Down Expand Up @@ -252,12 +247,6 @@ def _join_list_of_managers(self, others, **kwargs):
lsuffix = kwargs.get("lsuffix", "")
rsuffix = kwargs.get("rsuffix", "")

assert isinstance(others, list), \
"This method is for lists of DataManager objects only"
assert all(isinstance(other, type(self)) for other in others), \
"Different Manager objects are being used. This is not allowed"
cls = type(self)

joined_index = self._join_index_objects(1, [other.index for other in others], how, sort=sort)

to_join = [other.reindex(0, joined_index).data for other in others]
Expand Down Expand Up @@ -1069,9 +1058,6 @@ def from_pandas(cls, df, block_partitions_cls):
new_columns = df.columns
new_dtypes = df.dtypes

# Set the columns to RangeIndex for memory efficiency
df.index = pandas.RangeIndex(len(df.index))
df.columns = pandas.RangeIndex(len(df.columns))
new_data = block_partitions_cls.from_pandas(df)

return cls(new_data, new_index, new_columns, dtypes=new_dtypes)
Expand Down
103 changes: 17 additions & 86 deletions modin/pandas/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,89 +64,20 @@ def concat(objs,
raise ValueError("Only can inner (intersect) or outer (union) join the"
" other axis")

# We need this in a list because we use it later.
all_index, all_columns = list(
zip(*[(obj.index, obj.columns) for obj in objs]))

def series_to_df(series, columns):
df = pandas.DataFrame(series)
df.columns = columns
return DataFrame(df)

# Pandas puts all of the Series in a single column named 0. This is
# true regardless of the existence of another column named 0 in the
# concat.
if axis == 0:
objs = [
series_to_df(obj, [0]) if isinstance(obj, pandas.Series) else obj
for obj in objs
]
else:
# Pandas starts the count at 0 so this will increment the names as
# long as there's a new nameless Series being added.
def name_incrementer(i):
val = i[0]
i[0] += 1
return val

i = [0]
objs = [
series_to_df(
obj, obj.name if obj.name is not None else name_incrementer(i))
if isinstance(obj, pandas.Series) else obj for obj in objs
]

# Using concat on the columns and index is fast because they're empty,
# and it forces the error checking. It also puts the columns in the
# correct order for us.
final_index = \
pandas.concat([pandas.DataFrame(index=idx) for idx in all_index],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).index
final_columns = \
pandas.concat([pandas.DataFrame(columns=col)
for col in all_columns],
axis=axis, join=join, join_axes=join_axes,
ignore_index=ignore_index, keys=keys, levels=levels,
names=names, verify_integrity=verify_integrity,
copy=False).columns

# Put all of the DataFrames into Ray format
# TODO just partition the DataFrames instead of building a new Ray DF.
objs = [
DataFrame(obj)
if isinstance(obj, (pandas.DataFrame, pandas.Series)) else obj
for obj in objs
]

# Here we reuse all_columns/index so we don't have to materialize objects
# from remote memory built in the previous line. In the future, we won't be
# building new DataFrames, rather just partitioning the DataFrames.
if axis == 0:
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_columns[i], final_columns, axis,
len(objs[0]._block_partitions)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions))
for i in range(len(objs)) for part in objs[i]._block_partitions
])
else:
# Transposing the columns is necessary because the remote task treats
# everything like rows and returns in row-major format. Luckily, this
# operation is cheap in numpy.
new_blocks = np.array([
_reindex_helper._submit(
args=tuple([
all_index[i], final_index, axis,
len(objs[0]._block_partitions.T)
] + part.tolist()),
num_return_vals=len(objs[0]._block_partitions.T))
for i in range(len(objs)) for part in objs[i]._block_partitions.T
]).T

return DataFrame(
block_partitions=new_blocks, columns=final_columns, index=final_index)
# We have the weird Series and axis check because, when concatenating a
# dataframe to a series on axis=0, pandas ignores the name of the series,
# and this check aims to mirror that (possibly buggy) functionality
objs = [obj if isinstance(obj, DataFrame) else DataFrame(obj.rename()) if
isinstance(obj, pandas.Series) and axis == 0 else DataFrame(obj)
for obj in objs]
df = objs[0]
objs = [obj._data_manager for obj in objs]
new_manager = df._data_manager.concat(axis, objs[1:], join=join,
join_axes=None,
ignore_index=False,
keys=None,
levels=None,
names=None,
verify_integrity=False,
copy=True)
return DataFrame(data_manager=new_manager)
4 changes: 2 additions & 2 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2011,7 +2011,7 @@ def join(self,
# would otherwise require a lot more logic.
pandas.DataFrame(columns=self.columns).join(pandas.DataFrame(columns=other.columns), lsuffix=lsuffix, rsuffix=rsuffix).columns

return DataFrame(data_manager=self._data_manager.concat(1, other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
return DataFrame(data_manager=self._data_manager.join(other._data_manager, how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
else:
# This constraint carried over from Pandas.
if on is not None:
Expand All @@ -2024,7 +2024,7 @@ def join(self,
lsuffix=lsuffix,
rsuffix=rsuffix).columns

return DataFrame(data_manager=self._data_manager.concat(1, [obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))
return DataFrame(data_manager=self._data_manager.join([obj._data_manager for obj in other], how=how, lsuffix=lsuffix, rsuffix=rsuffix, sort=sort))

def kurt(self,
axis=None,
Expand Down
12 changes: 12 additions & 0 deletions modin/pandas/test/test_concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ def test_ray_concat():
assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2]), pandas.concat([df, df2]))

def test_ray_concat_with_series():
df, df2 = generate_dfs()
ray_df, ray_df2 = from_pandas(df), from_pandas(df2)
pandas_series = pandas.Series([1,2,3,4], name="new_col")

assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2, pandas_series], axis=0), pandas.concat([df,
df2, pandas_series], axis=0))

assert ray_df_equals_pandas(
pd.concat([ray_df, ray_df2, pandas_series], axis=1), pandas.concat([df,
df2, pandas_series], axis=1))

def test_ray_concat_on_index():
df, df2 = generate_dfs()
Expand Down