-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Re-implement shuffle using staging #1030
Re-implement shuffle using staging #1030
Conversation
574f766
to
daab4b3
Compare
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## branch-22.12 #1030 +/- ##
==============================================
Coverage ? 0.00%
==============================================
Files ? 18
Lines ? 2252
Branches ? 0
==============================================
Hits ? 0
Misses ? 2252
Partials ? 0 Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor comments, but no need to act if you feel it doesn't really make sense to.
ab2a437
to
169931a
Compare
169931a
to
de769f5
Compare
de769f5
to
fea22b1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly just some queries for understanding, with some relatively minor issues.
TBH: This code is quite hard to follow at this point, though that may be my unfamiliarity with it, especially to do with what is executing where.
@@ -147,6 +148,20 @@ async def _stop_ucp_listeners(session_state): | |||
del session_state["lf"] | |||
|
|||
|
|||
async def _stage_keys(session_state: dict, name: str, keys: set): | |||
worker: Worker = session_state["worker"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a docstring here?
for rank, out_part_ids in rank_to_out_part_ids.items(): | ||
if rank != myrank: | ||
msg = { | ||
i: to_serialize(out_part_id_to_dataframe.pop(i)) | ||
for i in (out_part_ids & out_part_id_to_dataframe.keys()) | ||
} | ||
futures.append(eps[rank].write(msg)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, loop over people we need to communicate with, rather than all endpoints.
recv(eps, myrank, rank_to_out_part_ids, out_part_id_to_dataframe_list, proxify), | ||
send(eps, myrank, rank_to_out_part_ids, out_part_id_to_dataframe), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A change here is that you don't collect all the futures for all the tasks and gather them at once, but wrap them up in nested asyncio.gather calls. I don't expect this will really change anything substantive about the performance, but just to note.
|
||
# Finally, we concatenate the output dataframes into the final output partitions | ||
ret: List[DataFrame] = [] | ||
for out_part_id, dataframe_list in out_part_id_to_dataframe_list.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for out_part_id, dataframe_list in out_part_id_to_dataframe_list.items(): | |
for dataframe_list in out_part_id_to_dataframe_list.values(): |
You never needed the id.
So probably a list comprehension here would be simpler:
ret = [
proxify(dd_concat(dfs, ignore_index=ignore_index)) for dfs in out_part_id_to_dataframe_list.values()
]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, fixed in 6464eb6
rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) | ||
c.client.cancel(df) # Notice, since `df` has been staged, nothing is freed here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I do not understand. We just copy some keys around here as far as I can tell (and don't persist anything), so I'm not sure what this even does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added more doc in 7ea9f6f
Thanks for the review @wence- ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the doc updates!
rerun tests |
@gpucibot merge |
Introduce staging in explicit-comms. The idea is to "stage" the keys of the input on the workers so that a later explicit-comms task can access and free the data associated with the keys.
Notice, explicit-comms and this new staging approach is still experimental. If or when it gets to a state where it provides a significant performance improvements over a range of workflows, the plan is to tighten up the API.