Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 126 additions & 10 deletions modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,11 @@ def diff(self, **kwargs):
return self.__constructor__(new_data, self.index, self.columns)

def dropna(self, **kwargs):
"""Returns a new DataManager with null values dropped along given axis.

Return:
a new DataManager
"""
axis = kwargs.get("axis", 0)
subset = kwargs.get("subset")
thresh = kwargs.get("thresh")
Expand Down Expand Up @@ -1243,13 +1248,20 @@ def dropna(self, **kwargs):
return self.drop(index=rm_from_index, columns=rm_from_columns)

def eval(self, expr, **kwargs):
"""Returns a new DataManager with expr evaluated on columns.

Args:
expr: The string expression to evaluate.

Returns:
A new PandasDataManager with new columns after applying expr.
"""
inplace = kwargs.get("inplace", False)

columns = self.index if self._is_transposed else self.columns
index = self.columns if self._is_transposed else self.index

# Dun eval on columns to determine result type
# Make a copy of columns and eval on the copy to determine if result type is series or not
columns_copy = pandas.DataFrame(columns=self.columns)
columns_copy = columns_copy.eval(expr, inplace=False, **kwargs)
expect_series = isinstance(columns_copy, pandas.Series)
Expand Down Expand Up @@ -1281,6 +1293,11 @@ def eval_builder(df, **kwargs):
return self.__constructor__(new_data, self.index, columns)

def mode(self, **kwargs):
"""Returns a new DataManager with modes calculated for each label along given axis.

Returns:
A new PandasDataManager with modes calculated.
"""
axis = kwargs.get("axis", 0)
func = self._prepare_method(pandas.DataFrame.mode, **kwargs)
new_data = self.map_across_full_axis(axis, func)
Expand All @@ -1300,8 +1317,11 @@ def mode(self, **kwargs):
return self.__constructor__(final_data, new_index, new_columns, self._dtype_cache)

def fillna(self, **kwargs):
"""Returns a new DataManager with null values filled by given values or according to given method.


Returns:
A new PandasDataManager with null values filled.
"""
axis = kwargs.get("axis", 0)
value = kwargs.get("value")

Expand Down Expand Up @@ -1550,6 +1570,14 @@ def from_pandas(cls, df, block_partitions_cls):

# __getitem__ methods
def getitem_single_key(self, key):
"""Get item for a single target index.

Args:
key: Target index by which to retrieve data.

Returns:
A new PandasDataManager.
"""
numeric_index = self.columns.get_indexer_for([key])

new_data = self.getitem_column_array([key])
Expand All @@ -1562,7 +1590,14 @@ def getitem_single_key(self, key):
return new_data.to_pandas()[key]

def getitem_column_array(self, key):
"""Get column data for target labels.

Args:
key: Target labels by which to retrieve data.

Returns:
A new PandasDataManager.
"""
# Convert to list for type checking
numeric_indices = list(self.columns.get_indexer_for(key))

Expand All @@ -1580,6 +1615,14 @@ def getitem(df, internal_indices=[]):
return self.__constructor__(result, self.index, new_columns, new_dtypes)

def getitem_row_array(self, key):
"""Get row data for target labels.

Args:
key: Target labels by which to retrieve data.

Returns:
A new PandasDataManager.
"""
# Convert to list for type checking
numeric_indices = list(self.index.get_indexer_for(key))

Expand All @@ -1600,8 +1643,15 @@ def delitem(self, key):
return self.drop(columns=[key])

def drop(self, index=None, columns=None):
"""Remove row data for target index and columns.

Args:
index: Target index to drop.
columns: Target columns to drop.

Returns:
A new PandasDataManager.
"""
if index is None:
new_data = self.data
new_index = self.index
Expand Down Expand Up @@ -1637,8 +1687,16 @@ def delitem(df, internal_indices=[]):
# return a new one from here and let the front end handle the inplace
# update.
def insert(self, loc, column, value):
"""Insert new column data.

Args:
loc: Insertion index.
column: Column labels to insert.
value: Dtype object values to insert.

Returns:
A new PandasDataManager with new data inserted.
"""
def insert(df, internal_indices=[]):
internal_idx = internal_indices[0]
df.insert(internal_idx, internal_idx, value, allow_duplicates=True)
Expand All @@ -1660,6 +1718,15 @@ def insert(df, internal_indices=[]):
# There is a wide range of behaviors that are supported, so a lot of the
# logic can get a bit convoluted.
def apply(self, func, axis, *args, **kwargs):
"""Apply func across given axis.

Args:
func: The function to apply.
axis: Target axis to apply the function along.

Returns:
A new PandasDataManager.
"""
if callable(func):
return self._callable_func(func, axis, *args, **kwargs)
elif isinstance(func, dict):
Expand All @@ -1670,6 +1737,15 @@ def apply(self, func, axis, *args, **kwargs):
pass

def _post_process_apply(self, result_data, axis, try_scale=True):
"""Recompute the index after applying function.

Args:
result_data: a BlockPartitions object.
axis: Target axis along which function was applied.

Returns:
A new PandasDataManager.
"""
if try_scale:
try:
index = self.compute_index(0, result_data, True)
Expand Down Expand Up @@ -1702,6 +1778,15 @@ def _post_process_apply(self, result_data, axis, try_scale=True):
return self.__constructor__(result_data, index, columns)

def _dict_func(self, func, axis, *args, **kwargs):
"""Apply function to certain indices across given axis.

Args:
func: The function to apply.
axis: Target axis to apply the function along.

Returns:
A new PandasDataManager.
"""
if "axis" not in kwargs:
kwargs["axis"] = axis

Expand All @@ -1728,7 +1813,15 @@ def dict_apply_builder(df, func_dict={}):
return full_result

def _list_like_func(self, func, axis, *args, **kwargs):
"""Apply list-like function across given axis.

Args:
func: The function to apply.
axis: Target axis to apply the function along.

Returns:
A new PandasDataManager.
"""
func_prepared = self._prepare_method(lambda df: df.apply(func, *args, **kwargs))
new_data = self.map_across_full_axis(axis, func_prepared)

Expand All @@ -1737,7 +1830,15 @@ def _list_like_func(self, func, axis, *args, **kwargs):
return self.__constructor__(new_data, new_index, self.columns)

def _callable_func(self, func, axis, *args, **kwargs):
"""Apply callable functions across given axis.

Args:
func: The functions to apply.
axis: Target axis to apply the function along.

Returns:
A new PandasDataManager.
"""
def callable_apply_builder(df, func, axis, index, *args, **kwargs):
if not axis:
df.index = index
Expand All @@ -1763,8 +1864,9 @@ def callable_apply_builder(df, func, axis, index, *args, **kwargs):
def _manual_repartition(self, axis, repartition_func, **kwargs):
"""This method applies all manual partitioning functions.

:param axis:
:param repartition_func:
Args:
axis: The axis to shuffle data along.
repartition_func: The function used to repartition data.

Returns:
A `BlockPartitions` object.
Expand All @@ -1787,6 +1889,14 @@ def groupby_agg_builder(df):
# END Manual Partitioning methods

def get_dummies(self, columns, **kwargs):
"""Convert categorical variables to dummy variables for certain columns.

Args:
columns: The columns to convert.

Returns:
A new PandasDataManager.
"""
cls = type(self)

# `columns` as None does not mean all columns, by default it means only
Expand Down Expand Up @@ -1899,9 +2009,12 @@ def global_idx_to_numeric_idx(self, axis, indices):
"""
Note: this function involves making copies of the index in memory.

:param axis:
:param indices:
:return:
Args:
axis: Axis to extract indices.
indices: Indices to convert to numerical.

Returns:
An Index object.
"""
assert axis in ['row', 'col', 'columns']
if axis == 'row':
Expand Down Expand Up @@ -1932,8 +2045,9 @@ def __init__(self,
index_map_series: pandas.Series=None,
columns_map_series: pandas.Series=None):
"""
:param index_map_series: a Pandas Series Object mapping user-facing index to numeric index.
:param columns_map_series: a Pandas Series Object mapping user-facing index to numeric index.
Args:
index_map_series: a Pandas Series Object mapping user-facing index to numeric index.
columns_map_series: a Pandas Series Object mapping user-facing index to numeric index.
"""
assert index_map_series is not None
assert columns_map_series is not None
Expand All @@ -1956,7 +2070,9 @@ def __constructor__(self, block_partitions_object: BlockPartitions, index: panda
def _get_data(self) -> BlockPartitions:
"""
Perform the map step
:return:

Returns:
A BlockPartitions object.
"""
def iloc(partition, row_internal_indices, col_internal_indices):
return partition.iloc[row_internal_indices, col_internal_indices]
Expand Down