diff --git a/modin/data_management/data_manager.py b/modin/data_management/data_manager.py index 43c4e922574..770399399f2 100644 --- a/modin/data_management/data_manager.py +++ b/modin/data_management/data_manager.py @@ -599,7 +599,7 @@ def query(self, expr, **kwargs): cls = type(self) columns = self.columns - def query_builder(df): + def query_builder(df, **kwargs): # This is required because of an Arrow limitation # TODO revisit for Arrow error df = df.copy() @@ -616,6 +616,33 @@ def query_builder(df): return cls(new_data, new_index, self.columns) + def eval(self, expr, **kwargs): + cls = type(self) + columns = self.columns + + def eval_builder(df, **kwargs): + df.columns = columns + result = df.eval(expr, inplace=False, **kwargs) + # If result is a series, expr was not an assignment expression. + if not isinstance(result, pandas.Series): + result.columns = pandas.RangeIndex(0, len(result.columns)) + return result + + func = self._prepare_method(eval_builder, **kwargs) + new_data = self.map_across_full_axis(1, func) + + # eval can update the columns, so we must update columns + columns_copy = pandas.DataFrame(columns=columns) + columns_copy = columns_copy.eval(expr, inplace=False, **kwargs) + if isinstance(columns_copy, pandas.Series): + # To create a data manager, we need the + # columns to be in a list-like + columns = list(columns_copy.name) + else: + columns = columns_copy.columns + + return cls(new_data, self.index, columns) + def quantile_for_list_of_values(self, **kwargs): cls = type(self) axis = kwargs.get("axis", 0) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 704c290f7c1..e65c890f7fc 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -1525,36 +1525,14 @@ def eval(self, expr, inplace=False, **kwargs): ndarray, numeric scalar, DataFrame, Series """ self._validate_eval_query(expr, **kwargs) - - columns = self.columns - - def eval_helper(df): - df.columns = columns - result = df.eval(expr, inplace=False, **kwargs) - # If result is a series, expr was not an assignment expression. - if not isinstance(result, pandas.Series): - result.columns = pandas.RangeIndex(0, len(result.columns)) - return result - inplace = validate_bool_kwarg(inplace, "inplace") - new_rows = _map_partitions(eval_helper, self._row_partitions) - result_type = ray.get( - _deploy_func.remote(lambda df: type(df), new_rows[0])) - if result_type is pandas.Series: - new_series = pandas.concat(ray.get(new_rows), axis=0, copy=False) - new_series.index = self.index - return new_series - - columns_copy = self._col_metadata._coord_df.copy().T - columns_copy.eval(expr, inplace=True, **kwargs) - columns = columns_copy.columns + data_manager = self._data_manager.eval(expr, **kwargs) if inplace: - self._update_inplace( - row_partitions=new_rows, columns=columns, index=self.index) + self._update_inplace(new_manager=data_manager) else: - return DataFrame(columns=columns, row_partitions=new_rows) + return DataFrame(data_manager=data_manager) def ewm(self, com=None, @@ -2877,6 +2855,7 @@ def query(self, expr, inplace=False, **kwargs): A new DataFrame if inplace=False """ self._validate_eval_query(expr, **kwargs) + inplace = validate_bool_kwarg(inplace, "inplace") new_manager = self._data_manager.query(expr, **kwargs) @@ -3224,9 +3203,11 @@ def sample(self, else 0 if axis == 0: - axis_length = len(self._row_metadata) + axis_labels = self._data_manager.index + axis_length = len(axis_labels) else: - axis_length = len(self._col_metadata) + axis_labels = self._data_manager.column + axis_length = len(axis_labels) if weights is not None: @@ -3304,15 +3285,6 @@ def sample(self, columns=[] if axis == 1 else self.columns, index=self.index if axis == 1 else []) - if axis == 1: - axis_labels = self.columns - partition_metadata = self._col_metadata - partitions = self._col_partitions - else: - axis_labels = self.index - partition_metadata = self._row_metadata - partitions = self._row_partitions - if random_state is not None: # Get a random number generator depending on the type of # random_state that is passed in @@ -3327,36 +3299,20 @@ def sample(self, # choose random numbers and then get corresponding labels from # chosen axis - sample_indices = random_num_gen.randint( - low=0, high=len(partition_metadata), size=n) + sample_indices = random_num_gen.choice( + np.arange(0, axis_length), size=n, replace=replace) samples = axis_labels[sample_indices] else: # randomly select labels from chosen axis samples = np.random.choice( a=axis_labels, size=n, replace=replace, p=weights) - # create an array of (partition, index_within_partition) tuples for - # each sample - part_ind_tuples = [partition_metadata[sample] for sample in samples] - if axis == 1: - # tup[0] refers to the partition number and tup[1] is the index - # within that partition - new_cols = [ - _deploy_func.remote(lambda df: df.iloc[:, [tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - col_partitions=new_cols, columns=samples, index=self.index) + data_manager = self._data_manager.getitem_col_array(samples) + return DataFrame(data_manager=data_manager) else: - new_rows = [ - _deploy_func.remote(lambda df: df.loc[[tup[1]]], - partitions[tup[0]]) - for tup in part_ind_tuples - ] - return DataFrame( - row_partitions=new_rows, columns=self.columns, index=samples) + data_manager = self._data_manager.getitem_row_array(samples) + return DataFrame(data_manager=data_manager) def select(self, crit, axis=0): raise NotImplementedError( diff --git a/modin/pandas/test/test_dataframe.py b/modin/pandas/test/test_dataframe.py index 99db0b9903c..9ae6c2c7ced 100644 --- a/modin/pandas/test/test_dataframe.py +++ b/modin/pandas/test/test_dataframe.py @@ -63,6 +63,7 @@ def test_int_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -233,6 +234,7 @@ def test_float_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -402,6 +404,7 @@ def test_mixed_dtype_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -569,6 +572,7 @@ def test_nan_dataframe(): filter_by = {'items': ['col1', 'col5'], 'regex': '4$|3$', 'like': 'col'} + test_sample(ray_df, pandas_df) test_filter(ray_df, pandas_df, filter_by) test_index(ray_df, pandas_df) test_size(ray_df, pandas_df) @@ -1529,6 +1533,18 @@ def test_eval_df_use_case(): frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} df = pandas.DataFrame(frame_data) ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "e = arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + df.eval( "e = arctan2(sin(a), b)", engine='python', @@ -1555,6 +1571,24 @@ def test_eval_df_arithmetic_subexpression(): assert ray_df_equals_pandas(ray_df, df) +def test_eval_df_series_result(): + frame_data = {'a': np.random.randn(10), 'b': np.random.randn(10)} + df = pandas.DataFrame(frame_data) + ray_df = pd.DataFrame(frame_data) + + # Very hacky test to test eval while inplace is not working + tmp_pandas = df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + tmp_ray = ray_df.eval( + "arctan2(sin(a), b)", + engine='python', + parser='pandas') + assert ray_df_equals_pandas(tmp_ray, tmp_pandas) + assert isinstance(to_pandas(tmp_ray), pandas.Series) + + def test_ewm(): ray_df = create_test_dataframe() @@ -2865,10 +2899,19 @@ def test_rtruediv(): test_inter_df_math_right_ops("rtruediv") -def test_sample(): - ray_df = create_test_dataframe() - assert len(ray_df.sample(n=4)) == 4 - assert len(ray_df.sample(frac=0.5)) == 2 +@pytest.fixture +def test_sample(ray_df, pd_df): + with pytest.raises(ValueError): + ray_df.sample(n=3, frac=0.4) + + assert ray_df_equals_pandas( + ray_df.sample(frac=0.5, random_state=42), + pd_df.sample(frac=0.5, random_state=42) + ) + assert ray_df_equals_pandas( + ray_df.sample(n=2, random_state=42), + pd_df.sample(n=2, random_state=42) + ) def test_select():