Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#4182, FIX-#4059: Add cell-wise execution for binary ops, fix bin ops for empty dataframes #4391

Merged
merged 12 commits into from
Jun 16, 2022
3 changes: 2 additions & 1 deletion docs/release_notes/release_notes-0.16.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ Key Features and Updates

* Stability and Bugfixes
* FIX-#4543: Fix `read_csv` in case skiprows=<0, []> (#4544)
* FIX-#4059: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* Performance enhancements
*
* PERF-#4182: Add cell-wise execution for binary ops, fix bin ops for empty dataframes (#4391)
* Benchmarking enhancements
*
* Refactor Codebase
Expand Down
34 changes: 27 additions & 7 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2507,18 +2507,38 @@ def binary_op(self, op, right_frame, join_type="outer"):
left_parts, right_parts, joined_index, row_lengths = self._copartition(
0, right_frame, join_type, sort=True
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
new_frame = self._partition_mgr_cls.binary_operation(
1, left_parts, lambda l, r: op(l, r), right_parts
new_left_frame = self.__constructor__(
left_parts, joined_index, self.columns, row_lengths, self._column_widths
)
new_columns = self.columns.join(right_frame.columns, how=join_type)
new_right_frame = self.__constructor__(
right_parts[0],
joined_index,
right_frame.columns,
row_lengths,
right_frame._column_widths,
)

(
left_parts,
right_parts,
joined_columns,
column_widths,
) = new_left_frame._copartition(1, new_right_frame, join_type, sort=True)

new_frame = (
np.array([])
if len(left_parts) == 0 or len(right_parts[0]) == 0
else self._partition_mgr_cls.binary_operation(
left_parts, op, right_parts[0]
)
)

return self.__constructor__(
new_frame,
joined_index,
new_columns,
joined_columns,
row_lengths,
column_widths=self._column_widths_cache,
column_widths,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down
35 changes: 15 additions & 20 deletions modin/core/dataframe/pandas/partitioning/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1242,44 +1242,39 @@ def compute_part_size(indexer, remote_part, part_idx, axis):

@classmethod
@wait_computations_if_benchmark_mode
def binary_operation(cls, axis, left, func, right):
def binary_operation(cls, left, func, right):
"""
Apply a function that requires two PandasDataframe objects.
Apply a function that requires two ``PandasDataframe`` objects.

Parameters
----------
axis : {0, 1}
The axis to apply the function over (0 - rows, 1 - columns).
left : np.ndarray
The partitions of left PandasDataframe.
The partitions of left ``PandasDataframe``.
func : callable
The function to apply.
right : np.ndarray
The partitions of right PandasDataframe.
The partitions of right ``PandasDataframe``.

Returns
-------
np.ndarray
A NumPy array with new partitions.
"""
if axis:
left_partitions = cls.row_partitions(left)
right_partitions = cls.row_partitions(right)
else:
left_partitions = cls.column_partitions(left)
right_partitions = cls.column_partitions(right)
[part.drain_call_queue() for part in right.flatten()]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case when we can pass the right partitions and the respective call queues in the remote calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can but I don't see any benefit from applying of the respective call queue of right partition in the remote call here. This will be useful in case we have lazy execution (but even that is questionable, described below), but here we use partition.apply which runs an execution of remote calls right now.

In this row we run an early execution of the call queue in background for each right_partition and immediately actualize right_partition._data. After, during serial logic of left_partition.apply call for each partition we will have already finished remote calls from the call queue of the each right_partition with some probability.

If we will use your way we can than draining of call queue for right_partition will happen only after draining of call queue for left_partition in remote call.

Also, by providing the call queue of right partition together with right_partition._data in left_partition.apply we don't drain right_partition.call_queue and don't actualize the right_partition._data. If we will work with the right dataframe further in the code we will again run drain_call_queue for each partition of right dataframe. In the result we have double work of the same calls but in different places.

What benefits do you see from your suggestion? How we will resolve the issue which I described in this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are by far valid points but on the other hand there are others that can affect the performance. For instance, it we are considering Ray, the following factors but probably not limited to can affect the performance: 1. how many partitions are contained in the right array. 2. how many physical (materialized) data of the right partitions will be saved into in-process memory of the driver. 3. how many physical (materialized) data of the right partitions will be saved into the plasma.

We can experiment this case as part of a separate issue if we see good times without passing the call queues of the right partitions in to the remote calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still believe that the main issue with implementation you suggest will in duplicating of processing operations for right partition(the first one is draining of call queue in remote call from the left partition, the second one in right partition directly in other part of code).

I mean the next flow in the sentences above:

df1 = pd.DataFrame(...) 
df2 = pd.DataFrame(...).T # Add tranpose operation in call queue of parts in df2

result = df1 + df2    # Perform cell-wise execution. Each remote call for left partition also runs transpose operation for right part. The parts of df2 isn't updated.
print(df2) # Drain call queue for parts of df2 (transpose operation)

So, I suggest to experiment with this new execution flow in a separate issue, because it, possible, requires a lot of architectural changes.


func = cls.preprocess_func(func)
result = np.array(
return np.array(
[
left_partitions[i].apply(
func,
num_splits=NPartitions.get(),
other_axis_partition=right_partitions[i],
)
for i in range(len(left_partitions))
[
part.apply(
func,
right[row_idx][col_idx]._data,
)
for col_idx, part in enumerate(left[row_idx])
]
for row_idx in range(len(left))
]
)
return result if axis else result.T

@classmethod
@wait_computations_if_benchmark_mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,14 +525,12 @@ def apply_func_to_indices_both_axis(

@classmethod
@progress_bar_wrapper
def binary_operation(cls, axis, left, func, right):
def binary_operation(cls, left, func, right):
"""
Apply a function that requires partitions of two ``PandasOnRayDataframe`` objects.

Parameters
----------
axis : {0, 1}
The axis to apply the function over (0 - rows, 1 - columns).
left : np.ndarray
The partitions of left ``PandasOnRayDataframe``.
func : callable
Expand All @@ -546,5 +544,5 @@ def binary_operation(cls, axis, left, func, right):
A NumPy array with new partitions.
"""
return super(PandasOnRayDataframePartitionManager, cls).binary_operation(
axis, left, func, right
left, func, right
)
37 changes: 37 additions & 0 deletions modin/pandas/test/dataframe/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,40 @@ def test_duplicate_indexes():
modin_df2, pandas_df2 = create_test_dfs({"a": data, "b": data})
df_equals(modin_df1 / modin_df2, pandas_df1 / pandas_df2)
df_equals(modin_df1 / modin_df1, pandas_df1 / pandas_df1)


@pytest.mark.parametrize("subset_operand", ["left", "right"])
def test_mismatched_col_partitions(subset_operand):
data = [0, 1, 2, 3]
modin_df1, pandas_df1 = create_test_dfs({"a": data, "b": data})
modin_df_tmp, pandas_df_tmp = create_test_dfs({"c": data})

modin_df2 = pd.concat([modin_df1, modin_df_tmp], axis=1)
pandas_df2 = pandas.concat([pandas_df1, pandas_df_tmp], axis=1)

if subset_operand == "right":
modin_res = modin_df2 + modin_df1
pandas_res = pandas_df2 + pandas_df1
else:
modin_res = modin_df1 + modin_df2
pandas_res = pandas_df1 + pandas_df2

df_equals(modin_res, pandas_res)


@pytest.mark.parametrize("empty_operand", ["right", "left", "both"])
def test_empty_df(empty_operand):
modin_df, pandas_df = create_test_dfs([0, 1, 2, 0, 1, 2])
modin_df_empty, pandas_df_empty = create_test_dfs()

if empty_operand == "right":
modin_res = modin_df + modin_df_empty
pandas_res = pandas_df + pandas_df_empty
elif empty_operand == "left":
modin_res = modin_df_empty + modin_df
pandas_res = pandas_df_empty + pandas_df
else:
modin_res = modin_df_empty + modin_df_empty
pandas_res = pandas_df_empty + pandas_df_empty

df_equals(modin_res, pandas_res)