diff --git a/python/ray/dataframe/dataframe.py b/python/ray/dataframe/dataframe.py index a178997b0793..cd6d1dd40c6b 100644 --- a/python/ray/dataframe/dataframe.py +++ b/python/ray/dataframe/dataframe.py @@ -4836,10 +4836,6 @@ def del_helper(df, to_delete): df.columns = pd.RangeIndex(0, len(df.columns)) return df - to_delete = self.columns.get_loc(key) - self._row_partitions = _map_partitions( - del_helper, self._row_partitions, to_delete) - # This structure is used to get the correct index inside the partition. del_df = self._col_metadata[key] @@ -4852,15 +4848,18 @@ def del_helper(df, to_delete): # Cast cols as pd.Series as duplicate columns mean result may be # np.int64 or pd.Series col_parts_to_del = \ - pd.Series(self._col_metadata[key, 'partition']).unique() + pd.Series(del_df['partition'].copy()).unique() self._col_metadata.drop(key) + for i in col_parts_to_del: # Compute the correct index inside the partition to delete. to_delete_in_partition = \ del_df[del_df['partition'] == i]['index_within_partition'] - self._col_partitions[i] = _deploy_func.remote( - del_helper, self._col_partitions[i], to_delete_in_partition) + for j in range(self._block_partitions.shape[0]): + self._block_partitions[j, i] = _deploy_func.remote( + del_helper, self._block_partitions[j, i], + to_delete_in_partition) self._col_metadata.reset_partition_coords(col_parts_to_del) diff --git a/python/ray/dataframe/index_metadata.py b/python/ray/dataframe/index_metadata.py index 11c23d8855cc..ad115f5a8910 100644 --- a/python/ray/dataframe/index_metadata.py +++ b/python/ray/dataframe/index_metadata.py @@ -203,15 +203,15 @@ def reset_partition_coords(self, partitions=None): # updated as well. try: self._coord_df.loc[partition_mask, - 'index_within_partition'] = [ - p for p in range(sum(partition_mask))] + 'index_within_partition'] = np.arange( + sum(partition_mask)).astype(int) except ValueError: # Copy the arrow sealed dataframe so we can mutate it. # We only do this the first time we try to mutate the sealed. self._coord_df = self._coord_df.copy() self._coord_df.loc[partition_mask, - 'index_within_partition'] = [ - p for p in range(sum(partition_mask))] + 'index_within_partition'] = np.arange( + sum(partition_mask)).astype(int) def insert(self, key, loc=None, partition=None, index_within_partition=None): @@ -354,7 +354,14 @@ def drop(self, labels, errors='raise'): # Update first lengths to prevent possible length inconsistencies if isinstance(dropped, pd.DataFrame): - drop_per_part = dropped.groupby(["partition"]).size()\ + try: + drop_per_part = dropped.groupby(["partition"]).size()\ + .reindex(index=pd.RangeIndex(len(self._lengths)), + fill_value=0) + except ValueError: + # Copy the arrow sealed dataframe so we can mutate it. + dropped = dropped.copy() + drop_per_part = dropped.groupby(["partition"]).size()\ .reindex(index=pd.RangeIndex(len(self._lengths)), fill_value=0) elif isinstance(dropped, pd.Series): diff --git a/python/ray/dataframe/test/test_dataframe.py b/python/ray/dataframe/test/test_dataframe.py index 6cd72ec4b99e..0054e265cdae 100644 --- a/python/ray/dataframe/test/test_dataframe.py +++ b/python/ray/dataframe/test/test_dataframe.py @@ -3274,6 +3274,12 @@ def test___delitem__(ray_df, pd_df): pd_df.__delitem__('col1') ray_df_equals_pandas(ray_df, pd_df) + # Issue 2027 + last_label = pd_df.iloc[:, -1].name + ray_df.__delitem__(last_label) + pd_df.__delitem__(last_label) + ray_df_equals_pandas(ray_df, pd_df) + def test___finalize__(): ray_df = create_test_dataframe()