Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion modin/data_management/data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ def query(self, expr, **kwargs):
cls = type(self)
columns = self.columns

def query_builder(df):
def query_builder(df, **kwargs):
# This is required because of an Arrow limitation
# TODO revisit for Arrow error
df = df.copy()
Expand All @@ -616,6 +616,33 @@ def query_builder(df):

return cls(new_data, new_index, self.columns)

def eval(self, expr, **kwargs):
cls = type(self)
columns = self.columns

def eval_builder(df, **kwargs):
df.columns = columns
result = df.eval(expr, inplace=False, **kwargs)
# If result is a series, expr was not an assignment expression.
if not isinstance(result, pandas.Series):
result.columns = pandas.RangeIndex(0, len(result.columns))
return result

func = self._prepare_method(eval_builder, **kwargs)
new_data = self.map_across_full_axis(1, func)

# eval can update the columns, so we must update columns
columns_copy = pandas.DataFrame(columns=columns)
columns_copy = columns_copy.eval(expr, inplace=False, **kwargs)
if isinstance(columns_copy, pandas.Series):
# To create a data manager, we need the
# columns to be in a list-like
columns = list(columns_copy.name)
else:
columns = columns_copy.columns

return cls(new_data, self.index, columns)

def quantile_for_list_of_values(self, **kwargs):
cls = type(self)
axis = kwargs.get("axis", 0)
Expand Down
72 changes: 14 additions & 58 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1525,36 +1525,14 @@ def eval(self, expr, inplace=False, **kwargs):
ndarray, numeric scalar, DataFrame, Series
"""
self._validate_eval_query(expr, **kwargs)

columns = self.columns

def eval_helper(df):
df.columns = columns
result = df.eval(expr, inplace=False, **kwargs)
# If result is a series, expr was not an assignment expression.
if not isinstance(result, pandas.Series):
result.columns = pandas.RangeIndex(0, len(result.columns))
return result

inplace = validate_bool_kwarg(inplace, "inplace")
new_rows = _map_partitions(eval_helper, self._row_partitions)

result_type = ray.get(
_deploy_func.remote(lambda df: type(df), new_rows[0]))
if result_type is pandas.Series:
new_series = pandas.concat(ray.get(new_rows), axis=0, copy=False)
new_series.index = self.index
return new_series

columns_copy = self._col_metadata._coord_df.copy().T
columns_copy.eval(expr, inplace=True, **kwargs)
columns = columns_copy.columns
data_manager = self._data_manager.eval(expr, **kwargs)

if inplace:
self._update_inplace(
row_partitions=new_rows, columns=columns, index=self.index)
self._update_inplace(new_manager=data_manager)
else:
return DataFrame(columns=columns, row_partitions=new_rows)
return DataFrame(data_manager=data_manager)

def ewm(self,
com=None,
Expand Down Expand Up @@ -2877,6 +2855,7 @@ def query(self, expr, inplace=False, **kwargs):
A new DataFrame if inplace=False
"""
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")

new_manager = self._data_manager.query(expr, **kwargs)

Expand Down Expand Up @@ -3224,9 +3203,11 @@ def sample(self,
else 0

if axis == 0:
axis_length = len(self._row_metadata)
axis_labels = self._data_manager.index
axis_length = len(axis_labels)
else:
axis_length = len(self._col_metadata)
axis_labels = self._data_manager.column
axis_length = len(axis_labels)

if weights is not None:

Expand Down Expand Up @@ -3304,15 +3285,6 @@ def sample(self,
columns=[] if axis == 1 else self.columns,
index=self.index if axis == 1 else [])

if axis == 1:
axis_labels = self.columns
partition_metadata = self._col_metadata
partitions = self._col_partitions
else:
axis_labels = self.index
partition_metadata = self._row_metadata
partitions = self._row_partitions

if random_state is not None:
# Get a random number generator depending on the type of
# random_state that is passed in
Expand All @@ -3327,36 +3299,20 @@ def sample(self,

# choose random numbers and then get corresponding labels from
# chosen axis
sample_indices = random_num_gen.randint(
low=0, high=len(partition_metadata), size=n)
sample_indices = random_num_gen.choice(
np.arange(0, axis_length), size=n, replace=replace)
samples = axis_labels[sample_indices]
else:
# randomly select labels from chosen axis
samples = np.random.choice(
a=axis_labels, size=n, replace=replace, p=weights)

# create an array of (partition, index_within_partition) tuples for
# each sample
part_ind_tuples = [partition_metadata[sample] for sample in samples]

if axis == 1:
# tup[0] refers to the partition number and tup[1] is the index
# within that partition
new_cols = [
_deploy_func.remote(lambda df: df.iloc[:, [tup[1]]],
partitions[tup[0]])
for tup in part_ind_tuples
]
return DataFrame(
col_partitions=new_cols, columns=samples, index=self.index)
data_manager = self._data_manager.getitem_col_array(samples)
return DataFrame(data_manager=data_manager)
else:
new_rows = [
_deploy_func.remote(lambda df: df.loc[[tup[1]]],
partitions[tup[0]])
for tup in part_ind_tuples
]
return DataFrame(
row_partitions=new_rows, columns=self.columns, index=samples)
data_manager = self._data_manager.getitem_row_array(samples)
return DataFrame(data_manager=data_manager)

def select(self, crit, axis=0):
raise NotImplementedError(
Expand Down
51 changes: 47 additions & 4 deletions modin/pandas/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def test_int_dataframe():

filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'}

test_sample(ray_df, pandas_df)
test_filter(ray_df, pandas_df, filter_by)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -233,6 +234,7 @@ def test_float_dataframe():

filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'}

test_sample(ray_df, pandas_df)
test_filter(ray_df, pandas_df, filter_by)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -402,6 +404,7 @@ def test_mixed_dtype_dataframe():

filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'}

test_sample(ray_df, pandas_df)
test_filter(ray_df, pandas_df, filter_by)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -569,6 +572,7 @@ def test_nan_dataframe():

filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'}

test_sample(ray_df, pandas_df)
test_filter(ray_df, pandas_df, filter_by)
test_index(ray_df, pandas_df)
test_size(ray_df, pandas_df)
Expand Down Expand Up @@ -1529,6 +1533,18 @@ def test_eval_df_use_case():
frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)}
df = pandas.DataFrame(frame_data)
ray_df = pd.DataFrame(frame_data)

# Very hacky test to test eval while inplace is not working
tmp_pandas = df.eval(
"e = arctan2(sin(a), b)",
engine='python',
parser='pandas')
tmp_ray = ray_df.eval(
"e = arctan2(sin(a), b)",
engine='python',
parser='pandas')
assert ray_df_equals_pandas(tmp_ray, tmp_pandas)

df.eval(
"e = arctan2(sin(a), b)",
engine='python',
Expand All @@ -1555,6 +1571,24 @@ def test_eval_df_arithmetic_subexpression():
assert ray_df_equals_pandas(ray_df, df)


def test_eval_df_series_result():
frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)}
df = pandas.DataFrame(frame_data)
ray_df = pd.DataFrame(frame_data)

# Very hacky test to test eval while inplace is not working
tmp_pandas = df.eval(
"arctan2(sin(a), b)",
engine='python',
parser='pandas')
tmp_ray = ray_df.eval(
"arctan2(sin(a), b)",
engine='python',
parser='pandas')
assert ray_df_equals_pandas(tmp_ray, tmp_pandas)
assert isinstance(to_pandas(tmp_ray), pandas.Series)


def test_ewm():
ray_df = create_test_dataframe()

Expand Down Expand Up @@ -2865,10 +2899,19 @@ def test_rtruediv():
test_inter_df_math_right_ops("rtruediv")


def test_sample():
ray_df = create_test_dataframe()
assert len(ray_df.sample(n=4)) == 4
assert len(ray_df.sample(frac=0.5)) == 2
@pytest.fixture
def test_sample(ray_df, pd_df):
with pytest.raises(ValueError):
ray_df.sample(n=3, frac=0.4)

assert ray_df_equals_pandas(
ray_df.sample(frac=0.5, random_state=42),
pd_df.sample(frac=0.5, random_state=42)
)
assert ray_df_equals_pandas(
ray_df.sample(n=2, random_state=42),
pd_df.sample(n=2, random_state=42)
)


def test_select():
Expand Down