Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions python/ray/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4836,10 +4836,6 @@ def del_helper(df, to_delete):
df.columns = pd.RangeIndex(0, len(df.columns))
return df

to_delete = self.columns.get_loc(key)
self._row_partitions = _map_partitions(
del_helper, self._row_partitions, to_delete)

# This structure is used to get the correct index inside the partition.
del_df = self._col_metadata[key]

Expand All @@ -4852,15 +4848,18 @@ def del_helper(df, to_delete):
# Cast cols as pd.Series as duplicate columns mean result may be
# np.int64 or pd.Series
col_parts_to_del = \
pd.Series(self._col_metadata[key, 'partition']).unique()
pd.Series(del_df['partition'].copy()).unique()
Copy link
Contributor Author

@simon-mo simon-mo May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy is necessary because unique will mutate the underlying data (🤔). If the underlying data comes from arrow sealed object, a "Buffer is read-only" exception will be raised.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of the caching and memoizing that Pandas does mutates the objects. In the future, hopefully we can get rid of the need for the copy.

self._col_metadata.drop(key)

for i in col_parts_to_del:
# Compute the correct index inside the partition to delete.
to_delete_in_partition = \
del_df[del_df['partition'] == i]['index_within_partition']

self._col_partitions[i] = _deploy_func.remote(
del_helper, self._col_partitions[i], to_delete_in_partition)
for j in range(self._block_partitions.shape[0]):
self._block_partitions[j, i] = _deploy_func.remote(
del_helper, self._block_partitions[j, i],
to_delete_in_partition)

self._col_metadata.reset_partition_coords(col_parts_to_del)

Expand Down
17 changes: 12 additions & 5 deletions python/ray/dataframe/index_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ def reset_partition_coords(self, partitions=None):
# updated as well.
try:
self._coord_df.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
'index_within_partition'] = np.arange(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous method will prevent consecutive del calls:

del df['col1']
del df['col2'] # Fail because the col_metadata contains float

It seems setting with loc requires extra caution here. That's why I added type casting.

sum(partition_mask)).astype(int)
except ValueError:
# Copy the arrow sealed dataframe so we can mutate it.
# We only do this the first time we try to mutate the sealed.
self._coord_df = self._coord_df.copy()
self._coord_df.loc[partition_mask,
'index_within_partition'] = [
p for p in range(sum(partition_mask))]
'index_within_partition'] = np.arange(
sum(partition_mask)).astype(int)

def insert(self, key, loc=None, partition=None,
index_within_partition=None):
Expand Down Expand Up @@ -354,7 +354,14 @@ def drop(self, labels, errors='raise'):

# Update first lengths to prevent possible length inconsistencies
if isinstance(dropped, pd.DataFrame):
drop_per_part = dropped.groupby(["partition"]).size()\
try:
drop_per_part = dropped.groupby(["partition"]).size()\
.reindex(index=pd.RangeIndex(len(self._lengths)),
fill_value=0)
except ValueError:
# Copy the arrow sealed dataframe so we can mutate it.
dropped = dropped.copy()
drop_per_part = dropped.groupby(["partition"]).size()\
.reindex(index=pd.RangeIndex(len(self._lengths)),
fill_value=0)
elif isinstance(dropped, pd.Series):
Expand Down
6 changes: 6 additions & 0 deletions python/ray/dataframe/test/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -3274,6 +3274,12 @@ def test___delitem__(ray_df, pd_df):
pd_df.__delitem__('col1')
ray_df_equals_pandas(ray_df, pd_df)

# Issue 2027
last_label = pd_df.iloc[:, -1].name
ray_df.__delitem__(last_label)
pd_df.__delitem__(last_label)
ray_df_equals_pandas(ray_df, pd_df)


def test___finalize__():
ray_df = create_test_dataframe()
Expand Down