-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#4182, FIX-#4059: Add cell-wise execution for binary ops, fix bin ops for empty dataframes #4391
Conversation
Codecov Report
@@ Coverage Diff @@
## master #4391 +/- ##
==========================================
+ Coverage 86.22% 89.40% +3.18%
==========================================
Files 228 229 +1
Lines 18451 18723 +272
==========================================
+ Hits 15909 16740 +831
+ Misses 2542 1983 -559
📣 Codecov can now indicate which changes are the most critical in Pull Requests. Learn more |
|
@prutskov any updates here? |
This PR is in a draft state, work is on hold. |
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py
Outdated
Show resolved
Hide resolved
cdfc466
to
1727cc1
Compare
Cell-wise execution requires an align partitioning by rows and cols at the same time that's expensive and sometimes isn't possible (in the case when left and right operands have significantly different shapes). Possible, that's reason why this is implemented as row-wise processing (#3100 (comment), @dchigarev). After additional investigation it was found, that Ray uses the same workers for execution for several axis-partitions. In this case at least one of the remote tasks waits until worker finished previous tasks. This can be observed for any Modin/Ray setup. Example: import time
import ray
ray.init(num_cpus=16) # We have 16 workers to distribute execution of remote tasks
import numpy as np
# import pandas as pd
import modin.pandas as pd
import modin.config as cfg
from modin.logging import get_logger
cfg.NPartitions.put(2) # We have 2 axis partitions, which should be processed in different workers to be parallel processed
cfg.BenchmarkMode.put(True)
cfg.LogMode.enable()
logger = get_logger()
def generate_data(nrows=5000, ncols=100):
data = {f"col{i}": np.random.rand(nrows) for i in range(ncols)}
return pd.DataFrame(data), pd.DataFrame(data)
if __name__ == "__main__":
print("Generating data...")
df1, df2 = generate_data(nrows=2_000_000, ncols=50)
print(f"Original shape: {df1.shape}")
t = time.time()
logger.info("Start")
result = df1 + df2 ---------> # Internally we have 2 row-partitions, which are processed in the same worker (print `pid` from remote function call to see).
logger.info("End")
print(f'Time binary op: {time.time() - t} s') In case of correct scheduling the time of axis-partitions processing could be reduced at least to 2 times. Does anyone faced with this scheduling problems? CC @modin-project/modin-core, @modin-project/modin-contributors |
@prutskov I am taking a look at this and have made some progress! I'll hopefully have an update within a few hours. |
@prutskov I can reproduce that serial execution on my mac. I added a print statement to the remote function here and saw a single task executing both remote functions. However, on an Ubuntu EC2 instance (specs below) with more RAM, I got parallel execution. I also get parallel execution on my mac when I decrease What are the specs of the machine you're testing on? My mac: macOS Monterey 12.2.1 The Ubuntu EC2 instance: AMI name: ubuntu/images/hvm-ssd/ubuntu-focal-20.04-amd64-server-20211129 |
Thank you @mvashishtha! Really, reducing of data size makes execution parallel. Thank you a lot for investigating! I will continue investigating with taking this in mind My laptop: OS: Linux WSL2 5.10.16.3-microsoft-standard-WSL2 |
4a2a920
to
0090468
Compare
I think we have a good explanation of the ray serial execution here: ray-project/ray#25448 (comment) @alexey Prutskov i think you saw the serial execution in your script here because the default ray object store from In practice, do we expect the total active arguments to hit the limit of 70% of plasma store? If so, we should look into how we can get away with using less plasma. |
@mvashishtha, thank you for finding of explanation of such behavior case!
It's difficult question. I guess we can to hit the limit of 70% of plasma store for both cell/axis-wise executions type in case we have "large" For axis-wise operations, waiting/execution of remaining operations will take more time that for the same cell-wise (all depends from size of one remote task, which is bigger for axis-wise opertions). For now I have no ideas how we can reduce the usage of plasma except increasing the size of plasma store during ray initialization. For binary operations cell-wise execution will be used, because in that case we have a smaller impact of possible serial execution in case we reach the limit of 70% of plasma store. |
As I understand the 70% limit, making binary operations cell-wise could help, but in a subtle way. The limit seems to be on the total size of arguments to all executing tasks. Right now for your serial script above we have 2 remote functions that each take 4 block partitions (a full column partition for each 2x2 partitioned frame). The total size of the arguments when we kick off the remote tasks are 4 block partitions. If instead we had 4 remote functions that each took 1 pair of blocks, we'd have 4 tasks but the total size of all arguments would be the same. However, when each task's argument size is smaller, ray can pack more tasks in at once. For example, say your object store size is 10 GB and your total argument cap is 7 GB, and each of the 8 blocks is 1 GB. In the axis-partition case, we can only schedule one task at once, because each task requires 4 GB (4 partitions) and both tasks would be 8 GB, over the limit. In the block-partition case, we can schedule 3/4 of the tasks at once since each requires 2 GB, and 3 tasks require 6 GB, which is under the limit. The smaller tasks can be "packed" tighter and closer to the limit, with 3x parallelism for most of the execution instead of no parallelism. edit: but again, I don't know whether this argument limit is something we hit in practice with the size of object store that modin sets by default. If not, we shouldn't take the limit into consideration in our design. edit: made some math errors :) edit: Also, in practice for us, the object store for us probably has many things besides arguments. how does that affect this problem 🤔 ? Does that mean we are even tighter on space for arguments? |
0090468
to
14d993a
Compare
I thought about the same example:)
I don't think that we should take the limit into consideration for our implementations. The warning message from ray about serial execution (which is mentioned in ray-project/ray#25448 (comment) ) will be enough.
I think many other objects in object storage can exacerbate the problem because in this case we will have less space for active arguments and sequential execution will happen more often. |
Performance results were attached in PR description. Blocked by #4563 |
14d993a
to
98b7f2d
Compare
98b7f2d
to
04a7ef5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance looks great!
I am not sure the performance looks great and comprehensive. We should check all/most corner cases such as applying a function to df and s, s and df, for example, as well as at least 2 or 3 general scenarios. |
right_partitions = cls.column_partitions(right) | ||
func = cls.preprocess_func(func) | ||
result = np.array( | ||
[part.drain_call_queue() for part in right.flatten()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the case when we can pass the right partitions and the respective call queues in the remote calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can but I don't see any benefit from applying of the respective call queue of right partition in the remote call here. This will be useful in case we have lazy execution (but even that is questionable, described below), but here we use partition.apply
which runs an execution of remote calls right now.
In this row we run an early execution of the call queue in background for each right_partition
and immediately actualize right_partition._data
. After, during serial logic of left_partition.apply
call for each partition we will have already finished remote calls from the call queue of the each right_partition
with some probability.
If we will use your way we can than draining of call queue for right_partition
will happen only after draining of call queue for left_partition
in remote call.
Also, by providing the call queue of right partition together with right_partition._data
in left_partition.apply
we don't drain right_partition.call_queue
and don't actualize the right_partition._data
. If we will work with the right dataframe further in the code we will again run drain_call_queue
for each partition of right dataframe. In the result we have double work of the same calls but in different places.
What benefits do you see from your suggestion? How we will resolve the issue which I described in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are by far valid points but on the other hand there are others that can affect the performance. For instance, it we are considering Ray, the following factors but probably not limited to can affect the performance: 1. how many partitions are contained in the right array. 2. how many physical (materialized) data of the right partitions will be saved into in-process memory of the driver. 3. how many physical (materialized) data of the right partitions will be saved into the plasma.
We can experiment this case as part of a separate issue if we see good times without passing the call queues of the right partitions in to the remote calls.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still believe that the main issue with implementation you suggest will in duplicating of processing operations for right partition(the first one is draining of call queue in remote call from the left partition, the second one in right partition directly in other part of code).
I mean the next flow in the sentences above:
df1 = pd.DataFrame(...)
df2 = pd.DataFrame(...).T # Add tranpose operation in call queue of parts in df2
result = df1 + df2 # Perform cell-wise execution. Each remote call for left partition also runs transpose operation for right part. The parts of df2 isn't updated.
print(df2) # Drain call queue for parts of df2 (transpose operation)
So, I suggest to experiment with this new execution flow in a separate issue, because it, possible, requires a lot of architectural changes.
The performance results were added for |
…ion for binary ops, fix bin ops for empty dataframes Signed-off-by: Alexey Prutskov <[email protected]>
9e90c5e
to
859b079
Compare
The actual benchmark results for all affected funcionality(binary ops, |
I agree that the case Series+DF/DF+Series should be implemented in a separate PR (as resolution of #4578, probably). This type of operations formes the result in a different from DF+DF form. It will require an additional changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prutskov, thanks for the changes and measurements, LGTM!
Signed-off-by: Alexey Prutskov [email protected]
What do these changes do?
PR adds cell-wise execution for binary operations as attempt to improve performance.
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
add
fails with empty DataFrame argument #4059docs/development/architecture.rst
is up-to-datePerformance results
Hardware info: OS: Windows11 21H2, CPU: AMD Ryzen 7 5800HS 3.2 GHz 8 cores (16 threads) RAM: 16 GB DDR4 3200 MHz
asv continuous -f 1.01 master HEAD -b TimeBinaryOp.time_binary_op --launch-method=spawn
is used with differentMODIN_NPARTITIONS
.MODIN_NPARTITIONS=16
(default)MODIN_NPARTITIONS=4
(optimal number of partitions for cell-wise execution)fillna
function wherebinary_op
is used as internal implementation (asv benchTimeFillnaSeries.time_fillna
). Parameters areself=Series
,value
hasSeries
type:MODIN_NPARTITIONS=16
(default)MODIN_NPARTITIONS=4
(optimal number of partitions for cell-wise execution)df.where
wherebinary_op
is used as internal impl.Benchmark code for `df.where` testing
Results:
MODIN_NPARTITIONS=16
(default)MODIN_NPARTITIONS=4
(optimal number of partitions for cell-wise execution)df.update
wherebinary_op
is used as internal impl.Benchmark code for `df.update` testing
Results:
MODIN_NPARTITIONS=16
(default)MODIN_NPARTITIONS=4
(optimal number of partitions for cell-wise execution)