Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Annotation-less P2P shuffling #7801

Merged
merged 28 commits into from
May 11, 2023

Conversation

hendrikmakait
Copy link
Member

@hendrikmakait hendrikmakait commented Apr 25, 2023

Closes #7716 by avoiding annotations altogether.

Core idea:

  • Instead of propagating information through annotations, we optimistically schedule output tasks on any worker, identify the correct worker and reschedule onto that worker.

Assumption:

  • Rescheduling each task at most once will not add significant overhead.

Open questions:

  • How do we deal with pre-existing worker restrictions that do not match the output worker we have chosen for the given partition?
  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Apr 25, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       25 files  ±       0         25 suites  ±0   15h 59m 22s ⏱️ + 1h 7m 8s
  3 597 tests +     16    3 489 ✔️ +     20     105 💤 ±  0  3  - 4 
44 210 runs  +1 110  42 101 ✔️ +1 046  2 105 💤 +67  4  - 3 

For more details on these failures, see this check.

Results for commit b276e79. ± Comparison against base commit 9d90792.

This pull request removes 14 and adds 30 tests. Note that renamed tests count towards both.
distributed.shuffle.tests.test_merge ‑ test_basic_merge[inner]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[left]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[outer]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[right]
distributed.shuffle.tests.test_merge ‑ test_merge[inner]
distributed.shuffle.tests.test_merge ‑ test_merge[left]
distributed.shuffle.tests.test_merge ‑ test_merge[outer]
distributed.shuffle.tests.test_merge ‑ test_merge[right]
distributed.shuffle.tests.test_rechunk ‑ test_raise_on_fuse_optimization
distributed.shuffle.tests.test_rechunk ‑ test_raise_on_lost_annotation
…
distributed.shuffle.tests.test_merge ‑ test_basic_merge[all-inner]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[all-left]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[all-outer]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[all-right]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[none-inner]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[none-left]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[none-outer]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[none-right]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[some-inner]
distributed.shuffle.tests.test_merge ‑ test_basic_merge[some-left]
…

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member

Cc @rjzamora and @d-v-b so that they're aware of this possibility

@hendrikmakait
Copy link
Member Author

cc @wence-

@hendrikmakait
Copy link
Member Author

Here are results from an A/B test on this, it looks like there is no significant performance impact:
Screenshot 2023-04-27 at 16 54 24

Comment on lines 131 to 132
if shuffle.run_id != run_id:
raise RuntimeError()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should also check if the shuffle this run_id points to is still valid. We don't want to have stale requests modify the restrictions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably document this somewhere, but if it's in ShuffleSchedulerExtension.states, it's an active and valid shuffle instance. Otherwise, it would have been dropped from there.

part, workers, npartitions
)
for part in parts_out:
# TODO: How do we deal with pre-existing worker restrictions?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't this logic be the same as before when it comes to existing worker restrictions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've adjusted the logic. The only unhandled situation I can imagine is if a task is fused with a follow-up task, which has worker restrictions of its own. In this case, we may ignore those. A fix for this behavior would be adding a shuffle-internal mechanism for transferring the output partition to another worker.

@fjetter
Copy link
Member

fjetter commented Apr 28, 2023

I think this all makes sense. I wonder if we still want to keep the annotations path around for the "happy case" where HLGs and annotations just work

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor queries.

@@ -133,6 +133,7 @@ def test_raise_on_fuse_optimization():
dd.shuffle.shuffle(df, "x", shuffle="p2p")


@pytest.mark.xfail()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test doesn't make sense any more after these changes right? So xfail is wrong and it should be deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is still work in progress; I've adjusted the test.

@@ -161,6 +161,7 @@ def test_raise_on_fuse_optimization():
rechunk(x, chunks=new, method="p2p")


@pytest.mark.xfail()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also now no longer expected to raise an error and so the xfail is a bit of a misnomer I think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this is still work in progress; I've adjusted the test.

distributed/shuffle/_scheduler_extension.py Outdated Show resolved Hide resolved
@hendrikmakait
Copy link
Member Author

hendrikmakait commented May 3, 2023

How do we deal with pre-existing worker restrictions that do not match the output worker we have chosen for the given partition?

If the worker restrictions are applied to the shuffle, they apply to all tasks. Thus, we can use the worker restrictions of the barrier task to bootstrap our set of valid workers. Consequently, the restrictions on the output task and the set of valid workers should match.

As things stand right now, I cannot think of another scenario that would add worker restrictions to tasks. With task fusion, annotations are still getting lost (dask/dask#7036). If this changes, we need to revisit this.

raise RuntimeError(
f"Barrier task with key {key!r} does not exist. This may be caused by "
"task fusion during graph generation. Please let us know that you ran "
"into this by leaving a comment at distributed#7816."
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XREF: #7816

@@ -238,10 +207,15 @@ async def test_rechunk_4d(c, s, *ws):
new = ((10,),) * 4
x2 = rechunk(x, chunks=new, method="p2p")
assert x2.chunks == new
assert np.all(await c.compute(x2) == a)
# FIXME: distributed#7816
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XREF: #7816

@hendrikmakait hendrikmakait changed the title [RFC] Annotation-less P2P shuffling Annotation-less P2P shuffling May 3, 2023
@hendrikmakait hendrikmakait marked this pull request as ready for review May 3, 2023 15:28
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation looks good. There are a couple of questions around testing we should address before merging

distributed/shuffle/_scheduler_extension.py Outdated Show resolved Hide resolved
barrier = self.scheduler.tasks[barrier_key(id)]

if barrier.worker_restrictions:
workers = list(barrier.worker_restrictions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implicit assumption here is that if the barrier and output tasks are guaranteed to have the same restrictions. I suggest to document this because it is a non-trivial conclusion and depending on how future versions of fusion work this may not even be true indefinitely.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a docstring.

Comment on lines +52 to +93
class ShuffleAnnotationChaosPlugin(SchedulerPlugin):
#: Rate at which the plugin randomly drops shuffle annotations
rate: float
scheduler: Scheduler | None
seen: set

def __init__(self, rate: float):
self.rate = rate
self.scheduler = None
self.seen = set()

async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler

def transition(
self,
key: str,
start: TaskStateState,
finish: TaskStateState,
*args: Any,
**kwargs: Any,
) -> None:
assert self.scheduler
if finish != "waiting":
return
if not key.startswith("shuffle-barrier-"):
return
if key in self.seen:
return

self.seen.add(key)

barrier = self.scheduler.tasks[key]

if self._flip():
barrier.annotations.pop("shuffle", None)
for dt in barrier.dependents:
if self._flip():
dt.annotations.pop("shuffle", None)

def _flip(self) -> bool:
return random.random() < self.rate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit torn about this. This is testing properties of the current implementation that I consider mostly accidental / nice to haves but not hard requirements.

  • The implementation uses the annotations once during execution of the first shuffle task. Afterwards the annotations can be entirely forgotten.
  • It also tests that a partially annotated graph can be understood

I don't think any of these requirements are actually necessary but this test is setting this now as an implicit requirement.
For instance, I think it is a fair assumption to say that either all tasks or no tasks are annotated. This is generally how fusion and our graph manipulation works. The fact that the implementation can handle a mixture is nice but not required.
I think it's also reasonably to say that annotations might be read during runtime and not just initially. I don't know when or how this is useful but I don't see why we should restrict ourselves to the current behavior.

I wouldn't want future development to be slowed down if they break these properties / because these tests fail.

@hendrikmakait what are your thoughts about this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, I'm also not a huge fan of this, but I think it is the most defensive way of testing that we do not rely on annotations. Let me disentangle your statement.

This is testing properties of the current implementation that I consider mostly accidental / nice to haves but not hard requirements.

This is true, but it is also the implementation of annotations that is broken. If it were not broken, we would not need to fix this.

For instance, I think it is a fair assumption to say that either all tasks or no tasks are annotated. This is generally how fusion and our graph manipulation works. The fact that the implementation can handle a mixture is nice but not required.

At the moment, I do not feel confident to make this claim. I suppose it's true, but I don't know what's broken with annotations at the moment. Also, this feels like an implementation-specific assumption. I could see fusion dropping annotations just for fused tasks if one does not pay attention. I would like to not have to test this, but here we are.

I think it's also reasonably to say that annotations might be read during runtime and not just initially. I don't know when or how this is useful but I don't see why we should restrict ourselves to the current behavior.

I don't see where we restrict ourselves here. We are stripping the barrier and its dependents of annotations once they arrive on the scheduler's state machine. This makes no assumption about when they will be read. The only assumption this makes is that all the ways that annotations are broken will strip them before the tasks arrive at the scheduler. I think this is a fair assumption.

I wouldn't want future development to be slowed down if they break these properties / because these tests fail.

Agreed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, I do not feel confident to make this claim

Fair enough. It's true that it's hard to extrapolate what happens in the future.

I could see fusion dropping annotations just for fused tasks if one does not pay attention.

Indeed. My point is rather that if fusion happens this should affect all output keys. However, this is also a generalization that may not be true.

I think it's also reasonably to say that annotations might be read during runtime and not just initially. I don't know when or how this is useful but I don't see why we should restrict ourselves to the current behavior.

I don't see where we restrict ourselves here.

Sorry, I see how my statement is not very clear. This thing mutates annotations as soon as the barrier is transitioned to a specific state. If we read out the annotations before this already, this modification will not have any effect.
If I'm not mistaken, this is also what's happening in the current implementation. Once the first transfer task is running we're evaluating annotations and are fixing the mappings. This transition hook is only executed later, i.e. the mutated annotations will not have an effect on any of this.
If we later changed the behavior such that the annotations are evaluated later or multiple times we are suddenly hitting this test code which may no longer make sense.

This is all a bit academic. I will not block merging because of this.

I wouldn't want future development to be slowed down if they break these properties / because these tests fail.

Agreed.

What I meant saying is that red tests often discourage people to change something and once this becomes legacy code, the new developer generation may not fully understand the context about whether this is required or just nice-to-have behavior.
I raised my concerns. If future developers run into this, they can follow the breadcrumbs to this thread =)

Comment on lines 918 to 921
await n.process.process.kill()
block_event.set()
with pytest.raises(RuntimeError):
await fut
await block_event.set()

await fut
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is now able to recover we should add a couple of additional asserts

  1. Assert that there is indeed a shuffle task on the dead worker
  2. Assert that the output result is what is expected

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised that this change allows us to rerun. Do you understand why this works? You also reduced the data / number of partitions which makes me nervous in thinking that the now-dead worker just didn't run any shuffle tasks

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TL; DR: Things now work as they should have worked from the beginning.

I didn't understand why this test failed in the beginning, but after some digging, I have figured it out: This is a positive side-effect of us correctly cleaning up worker restrictions when removing a shuffle on the scheduler. Previously, some output tasks would remain pinned to the failed worker. Since the worker isn't around anymore, workers that try to send data over will have a bad time and fail to connect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for digging in

Copy link
Contributor

@j-bennet j-bennet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know enough about annotations and what they are used for, but simpler workflow is almost always better, and this looks simpler.

I would be interested to see if this extra scheduling work (scheduling a task to a worker which may not be the right worker, and having to reschedule) leads to any slowdowns in realistically large workflows.

distributed/reschedule.py Outdated Show resolved Hide resolved
distributed/shuffle/_scheduler_extension.py Show resolved Hide resolved
# This may occur if multiple barriers share the same output task,
# e.g. in a hash join.
return
ts.annotations["shuffle_original_restrictions"] = ts.worker_restrictions.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dummy question, sorry. The problem that you're fixing is tasks occasionally losing annotations. AFAIK this is applicable to all annotations, not just shuffle annotations, and your ShuffleAnnotationChaosPlugin is only killingshuffle annotation. Is there a possibility that in a real-world scenario, this new shuffle_original_restrictions will be lost too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what we've seen, the current assumption is that annotations get lost before the tasks make it to the scheduler.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to go once CI is green-ish

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to go once CI is green-ish

@hendrikmakait
Copy link
Member Author

It's not exactly green, but test failures appear to be unrelated, so I'll merge this in.

@hendrikmakait hendrikmakait merged commit 21b70be into dask:main May 11, 2023
milesgranger pushed a commit to milesgranger/distributed that referenced this pull request May 15, 2023
@mrocklin
Copy link
Member

@rjzamora my guess is that p2p shuffling should be doable in dask-expr now.

@rjzamora
Copy link
Member

my guess is that p2p shuffling should be doable in dask-expr now.

Nice! Thanks for this @hendrikmakait !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Tracking] Annotations-related issues with P2P
6 participants