Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 72 additions & 1 deletion modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,83 @@ def inter_data_op_builder(left, right, self_cols, other_cols, func):
new_data = reindexed_self.inter_data_operation(1, lambda l, r: inter_data_op_builder(l, r, self_cols, other_cols, func), reindexed_other)

return cls(new_data, joined_index, new_columns)

def _inter_df_op_handler(self, func, other, **kwargs):
"""Helper method for inter-DataFrame and scalar operations"""
axis = kwargs.get("axis", 0)

if isinstance(other, type(self)):
return self.inter_manager_operations(other, "outer",
lambda x, y: func(x, y, **kwargs))
else:
return self.scalar_operations(axis, other,
lambda df: func(df, other, **kwargs))

def add(self, other, **kwargs):
#TODO: need to write a prepare_function for inter_df operations
func = pandas.DataFrame.add
return self._inter_df_op_handler(func, other, **kwargs)

def div(self, other, **kwargs):
func = pandas.DataFrame.div
return self._inter_df_op_handler(func, other, **kwargs)

def eq(self, other, **kwargs):
func = pandas.DataFrame.eq
return self._inter_df_op_handler(func, other, **kwargs)

def floordiv(self, other, **kwargs):
func = pandas.DataFrame.floordiv
return self._inter_df_op_handler(func, other, **kwargs)

def ge(self, other, **kwargs):
func = pandas.DataFrame.ge
return self._inter_df_op_handler(func, other, **kwargs)

def gt(self, other, **kwargs):
func = pandas.DataFrame.gt
return self._inter_df_op_handler(func, other, **kwargs)

def le(self, other, **kwargs):
func = pandas.DataFrame.le
return self._inter_df_op_handler(func, other, **kwargs)

def lt(self, other, **kwargs):
func = pandas.DataFrame.lt
return self._inter_df_op_handler(func, other, **kwargs)

def mod(self, other, **kwargs):
func = pandas.DataFrame.mod
return self._inter_df_op_handler(func, other, **kwargs)

def mul(self, other, **kwargs):
func = pandas.DataFrame.mul
return self._inter_df_op_handler(func, other, **kwargs)

def ne(self, other, **kwargs):
func = pandas.DataFrame.ne
return self._inter_df_op_handler(func, other, **kwargs)

def pow(self, other, **kwargs):
func = pandas.DataFrame.pow
return self._inter_df_op_handler(func, other, **kwargs)

def sub(self, other, **kwargs):
func = pandas.DataFrame.sub
return self._inter_df_op_handler(func, other, **kwargs)

def truediv(self, other, **kwargs):
func = pandas.DataFrame.truediv
return self._inter_df_op_handler(func, other, **kwargs)

# END Inter-Data operations

# Single Manager scalar operations (e.g. add to scalar, list of scalars)
def scalar_operations(self, axis, scalar, func):
if isinstance(scalar, list):
return self.map_across_full_axis(axis, func)
cls = type(self)
new_data = self.map_across_full_axis(axis, func)
return cls(new_data, self.index, self.columns)
else:
return self.map_partitions(func)
# END Single Manager scalar operations
Expand Down
187 changes: 142 additions & 45 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,16 @@ def add(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the applied addition.
"""
return self._operator_helper(pandas.DataFrame.add, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.add(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def agg(self, func, axis=0, *args, **kwargs):
return self.aggregate(func, axis, *args, **kwargs)
Expand Down Expand Up @@ -1192,8 +1200,16 @@ def div(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pandas.DataFrame.div, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.div(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def divide(self, other, axis='columns', level=None, fill_value=None):
"""Synonym for div.
Expand Down Expand Up @@ -1313,7 +1329,15 @@ def eq(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.eq, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.eq(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def equals(self, other):
"""
Expand Down Expand Up @@ -1562,8 +1586,16 @@ def floordiv(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pandas.DataFrame.floordiv, other, axis,
level, fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.floordiv(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

@classmethod
def from_csv(self,
Expand Down Expand Up @@ -1614,7 +1646,15 @@ def ge(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.ge, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.ge(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def get(self, key, default=None):
"""Get item from object for given key (DataFrame column, Panel
Expand Down Expand Up @@ -1674,7 +1714,15 @@ def gt(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.gt, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.gt(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def head(self, n=5):
"""Get the first n rows of the DataFrame.
Expand Down Expand Up @@ -2056,7 +2104,15 @@ def le(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.le, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.le(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def lookup(self, row_labels, col_labels):
raise NotImplementedError(
Expand All @@ -2074,7 +2130,15 @@ def lt(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.lt, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.lt(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def mad(self, axis=None, skipna=None, level=None):
raise NotImplementedError(
Expand Down Expand Up @@ -2382,8 +2446,16 @@ def mod(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Mod applied.
"""
return self._operator_helper(pandas.DataFrame.mod, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.mod(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def mode(self, axis=0, numeric_only=False):
"""Perform mode across the DataFrame.
Expand Down Expand Up @@ -2411,8 +2483,16 @@ def mul(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Multiply applied.
"""
return self._operator_helper(pandas.DataFrame.mul, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.mul(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def multiply(self, other, axis='columns', level=None, fill_value=None):
"""Synonym for mul.
Expand All @@ -2439,7 +2519,15 @@ def ne(self, other, axis='columns', level=None):
Returns:
A new DataFrame filled with Booleans.
"""
return self._operator_helper(pandas.DataFrame.ne, other, axis, level)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.ne(other=other,
axis=axis,
level=level)
return self._create_dataframe_from_manager(new_manager)

def nlargest(self, n, columns, keep='first'):
raise NotImplementedError(
Expand Down Expand Up @@ -2584,8 +2672,16 @@ def pow(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Pow applied.
"""
return self._operator_helper(pandas.DataFrame.pow, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.pow(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def prod(self,
axis=None,
Expand Down Expand Up @@ -3596,8 +3692,16 @@ def sub(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the subtraciont applied.
"""
return self._operator_helper(pandas.DataFrame.sub, other, axis, level,
fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.sub(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def subtract(self, other, axis='columns', level=None, fill_value=None):
"""Alias for sub.
Expand Down Expand Up @@ -4022,8 +4126,16 @@ def truediv(self, other, axis='columns', level=None, fill_value=None):
Returns:
A new DataFrame with the Divide applied.
"""
return self._operator_helper(pandas.DataFrame.truediv, other, axis,
level, fill_value)
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")

other = self._validate_other(other, axis)
new_manager = self._data_manager.truediv(other=other,
axis=axis,
level=level,
fill_value=fill_value)
return self._create_dataframe_from_manager(new_manager)

def truncate(self, before=None, after=None, axis=None, copy=True):
raise NotImplementedError(
Expand Down Expand Up @@ -4669,35 +4781,20 @@ def _copartition(self, other, new_index):

return zip(new_partitions_self, new_partitions_other)

def _operator_helper(self, func, other, axis, level, *args):
"""Helper method for inter-DataFrame and scalar operations"""
if isinstance(other, DataFrame):
return self._inter_df_op_helper(
lambda x, y: func(x, y, axis, level, *args), other, "outer",
level)
else:
return self._single_df_op_helper(
lambda df: func(df, other, axis, level, *args), other, axis,
level)

def _inter_df_op_helper(self, func, other, how, level, inplace=False):
if level is not None:
raise NotImplementedError("Mutlilevel index not yet supported "
"in Pandas on Ray")
new_manager = self._data_manager.inter_manager_operations(other._data_manager, how, func)

def _create_dataframe_from_manager(self, new_manager, inplace=False):
"""Returns or updates a DataFrame given new data_manager"""
if not inplace:
return DataFrame(data_manager=new_manager)
else:
self._update_inplace(new_manager=new_manager)

def _single_df_op_helper(self, func, other, axis, level):
if level is not None:
raise NotImplementedError("Multilevel index not yet supported "
"in Pandas on Ray")
def _validate_other(self, other, axis):
"""Helper method to check validity of other in inter-df operations"""
axis = pandas.DataFrame()._get_axis_number(axis)

if is_list_like(other):
if isinstance(other, DataFrame):
return other._data_manager
elif is_list_like(other):
if axis == 0:
if len(other) != len(self.index):
raise ValueError(
Expand All @@ -4708,8 +4805,8 @@ def _single_df_op_helper(self, func, other, axis, level):
raise ValueError(
"Unable to coerce to Series, length must be {0}: "
"given {1}".format(len(self.columns), len(other)))
return other

return DataFrame(data_manager=self._data_manager.scalar_operations(axis, other, func))


@ray.remote
Expand Down