Skip to content

Commit

Permalink
Documentation for Fine Performance Metrics and Spans (#7945)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jul 6, 2023
1 parent 566fd1f commit 8e6c287
Show file tree
Hide file tree
Showing 14 changed files with 420 additions and 65 deletions.
2 changes: 2 additions & 0 deletions distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
from distributed.scheduler import KilledWorker, Scheduler
from distributed.security import Security
from distributed.semaphore import Semaphore
from distributed.spans import span
from distributed.threadpoolexecutor import rejoin
from distributed.utils import CancelledError, TimeoutError, sync
from distributed.variable import Variable
Expand Down Expand Up @@ -174,6 +175,7 @@ def _():
"rejoin",
"rpc",
"secede",
"span",
"sync",
"wait",
"warn",
Expand Down
3 changes: 2 additions & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import dask
from dask.utils import parse_timedelta

# Needed to avoid Sphinx WARNING: more than one target found for cross-reference 'TaskState' and 'WorkerState'"
# Needed to avoid Sphinx WARNING: more than one target found for cross-reference
# 'TaskState' and 'WorkerState'"
# https://github.com/agronholm/sphinx-autodoc-typehints#dealing-with-circular-imports
from distributed import client
from distributed import scheduler as scheduler_module
Expand Down
6 changes: 6 additions & 0 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3632,6 +3632,12 @@ def _build_data_sources(self) -> None:

# Custom metrics won't necessarily contain a string as the label
activity = str(activity)

# TODO We could implement some fancy logic in spans.py to change the label
# if no other spans are running at the same time.
if not self.span_tag_selector.value and activity == "idle or other spans":
activity = "idle"

execute_by_func[function, activity] += v
execute[activity] += v
visible_functions.add(function)
Expand Down
5 changes: 5 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ async def test_FinePerformanceMetrics(c, s, a, b):
# Test with no metrics
cl.update()
assert not cl.visible_functions
assert not cl.visible_activities
assert not cl.span_tag_selector.options
assert not cl.function_selector.options
assert cl.unit_selector.options == ["seconds"]
Expand Down Expand Up @@ -393,6 +394,8 @@ def f():
assert "('foo', 1)" in cl.visible_activities
assert "None" in cl.visible_activities
assert "hideme" not in cl.visible_activities
assert "idle" in cl.visible_activities
assert "idle or other spans" not in cl.visible_activities
assert sorted(cl.span_tag_selector.options) == ["default", "foo"]

orig_activities = cl.visible_activities[:]
Expand All @@ -417,6 +420,8 @@ def f():
cl.update()
assert sorted(cl.visible_functions) == ["N/A", "y", "z"]
assert sorted(cl.function_selector.options) == ["N/A", "v", "w", "x", "y", "z"]
assert "idle" not in cl.visible_activities
assert "idle or other spans" in cl.visible_activities


@gen_cluster(
Expand Down
101 changes: 39 additions & 62 deletions distributed/spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
from distributed.metrics import time

if TYPE_CHECKING:
# Needed to avoid Sphinx WARNING: more than one target found for cross-reference
# 'TaskState' and 'WorkerState'"
# https://github.com/agronholm/sphinx-autodoc-typehints#dealing-with-circular-imports
from distributed import Scheduler, Worker
from distributed import scheduler as scheduler_module
from distributed.client import SourceCode
from distributed.scheduler import TaskGroup, TaskState, TaskStateState, WorkerState
from distributed.scheduler import TaskGroup, TaskStateState


@contextmanager
Expand All @@ -35,45 +39,17 @@ def span(*tags: str) -> Iterator[str]:
may end up with overlapping default spans, e.g. if a worker crashes and all unique
tasks that were in memory on it need to be recomputed.
Examples
--------
>>> import dask.array as da
>>> import distributed
>>> client = distributed.Client()
>>> with span("my workflow"):
... with span("phase 1"):
... a = da.random.random(10)
... b = a + 1
... with span("phase 2"):
... c = b * 2
... d = c.sum()
>>> d.compute()
In the above example,
- Tasks of collections a and b are annotated to belong to span
``('my workflow', 'phase 1')``, 'ids': (<id0>, <id1>)}``;
- Tasks of collection c (that aren't already part of a or b) are annotated to belong
to span ``('my workflow', 'phase 2')``;
- Tasks of collection d (that aren't already part of a, b, or c) are *not*
annotated but will nonetheless be attached to span ``('default', )``.
You may also set more than one tag at once; e.g.
>>> with span("workflow1", "version1"):
... ...
Finally, you may capture the ID of a span on the client to match it with the
:class:`Span` objects the scheduler:
>>> cluster = distributed.LocalCluster()
>>> client = distributed.Client(cluster)
You may capture the ID of a span on the client to match it with the
:class:`~distributed.spans.Span` objects the scheduler:
>>> client = Client()
>>> with span("my workflow") as span_id:
... client.submit(lambda: "Hello world!").result()
>>> span = client.cluster.scheduler.extensions["spans"].spans[span_id]
>>> client.cluster.scheduler.extensions["spans"].spans[span_id]
Span<name=('my workflow',), id=5dc9b908-116b-49a5-b0d7-5a681f49a111>
Notes
-----
Spans are based on annotations, and just like annotations they can be lost during
optimization. Set config ``optimization.fuse.active: false`` to prevent this issue.
You may retrieve the current span with ``dask.get_annotations().get("span")``.
You can do so in the client code as well as from inside a task.
"""
Expand All @@ -97,8 +73,8 @@ class Span:
#: worker.
name: tuple[str, ...]

#: <uuid>
#: Taken from ``TaskState.annotations["span"]["id"][-1]``.
#: Unique ID, generated by :func:`~distributed.span` and
#: taken from ``TaskState.annotations["span"]["id"][-1]``.
#: Matches ``distributed.scheduler.TaskState.group.span_id``
#: and ``distributed.worker_state_machine.TaskState.span_id``.
id: str
Expand All @@ -110,23 +86,24 @@ class Span:

#: Task groups *directly* belonging to this span.
#:
#: See also
#: See Also
#: --------
# traverse_groups
#:
#: Notes
#: -----
#: TaskGroups are forgotten when the last task is forgotten. If a user calls
#: compute() twice on the same collection, you'll have more than one group with the
#: same tg.name in this set! For the same reason, while the same TaskGroup object is
#: guaranteed to be attached to exactly one Span, you may have different TaskGroups
#: with the same key attached to different Spans.
#: TaskGroups are forgotten by the Scheduler when the last task is forgotten, but
#: remain referenced here indefinitely. If a user calls compute() twice on the same
#: collection, you'll have more than one group with the same tg.name in this set!
#: For the same reason, while the same TaskGroup object is guaranteed to be attached
#: to exactly one Span, you may have different TaskGroups with the same key attached
#: to different Spans.
groups: set[TaskGroup]

#: Time when the span first appeared on the scheduler.
#: The same property on parent spans is always less than or equal to this.
#:
#: See also
#: See Also
#: --------
#: start
#: stop
Expand Down Expand Up @@ -212,12 +189,12 @@ def start(self) -> float:
"""Earliest time when a task belonging to this span tree started computing;
0 if no task has *finished* computing yet.
Note
----
Notes
-----
This is not updated until at least one task has *finished* computing.
It could move backwards as tasks complete.
See also
See Also
--------
enqueued
stop
Expand All @@ -242,7 +219,7 @@ def stop(self) -> float:
This differs from ``TaskGroup.stop`` when there aren't unfinished tasks; is also
will never be zero.
See also
See Also
--------
enqueued
start
Expand All @@ -262,7 +239,7 @@ def states(self) -> dict[TaskStateState, int]:
"""The number of tasks currently in each state in this span tree;
e.g. ``{"memory": 10, "processing": 3, "released": 4, ...}``.
See also
See Also
--------
distributed.scheduler.TaskGroup.states
"""
Expand All @@ -278,7 +255,7 @@ def done(self) -> bool:
added or when a worker that contained the only replica of a task in memory
crashes and the task need to be recomputed.
See also
See Also
--------
distributed.scheduler.TaskGroup.done
"""
Expand All @@ -288,7 +265,7 @@ def done(self) -> bool:
def all_durations(self) -> dict[str, float]:
"""Cumulative duration of all completed actions in this span tree, by action
See also
See Also
--------
duration
distributed.scheduler.TaskGroup.all_durations
Expand All @@ -299,7 +276,7 @@ def all_durations(self) -> dict[str, float]:
def duration(self) -> float:
"""The total amount of time spent on all tasks in this span tree
See also
See Also
--------
all_durations
distributed.scheduler.TaskGroup.duration
Expand All @@ -310,7 +287,7 @@ def duration(self) -> float:
def nbytes_total(self) -> int:
"""The total number of bytes that this span tree has produced
See also
See Also
--------
distributed.scheduler.TaskGroup.nbytes_total
"""
Expand All @@ -329,9 +306,9 @@ def code(self) -> list[tuple[SourceCode, ...]]:

@property
def cumulative_worker_metrics(self) -> dict[tuple[Hashable, ...], float]:
"""Replica of Worker.digests_total and Scheduler.cumulative_worker_metrics, but
only for the metrics that can be attributed to the current span tree.
The span_id has been removed from the key.
"""Replica of ``Worker.digests_total`` and
``Scheduler.cumulative_worker_metrics``, but only for the metrics that can be
attributed to the current span tree. The span id has been removed from the key.
At the moment of writing, all keys are
``("execute", <task prefix>, <activity>, <unit>)``
Expand Down Expand Up @@ -411,7 +388,7 @@ def _active_timeseries(self) -> Iterator[tuple[float, bool]]:
def nthreads_intervals(self) -> list[tuple[float, float, int]]:
"""
Returns
------
-------
List of tuples:
- begin timestamp
Expand Down Expand Up @@ -495,7 +472,7 @@ def __init__(self, scheduler: Scheduler):
self.spans_search_by_tag = defaultdict(list)

def observe_tasks(
self, tss: Iterable[TaskState], code: tuple[SourceCode, ...]
self, tss: Iterable[scheduler_module.TaskState], code: tuple[SourceCode, ...]
) -> dict[str, dict]:
"""Acknowledge the existence of runnable tasks on the scheduler. These may
either be new tasks, tasks that were previously unrunnable, or tasks that were
Expand Down Expand Up @@ -612,13 +589,13 @@ def merge_by_tags(self, *tags: str) -> Span:
return Span.merge(*self.find_by_tags(*tags))

def heartbeat(
self, ws: WorkerState, data: dict[tuple[Hashable, ...], float]
self, ws: scheduler_module.WorkerState, data: dict[tuple[Hashable, ...], float]
) -> None:
"""Triggered by SpansWorkerExtension.heartbeat().
"""Triggered by :meth:`SpansWorkerExtension.heartbeat`.
Populate :meth:`Span.cumulative_worker_metrics` with data from the worker.
See also
See Also
--------
SpansWorkerExtension.heartbeat
Span.cumulative_worker_metrics
Expand Down Expand Up @@ -661,7 +638,7 @@ def heartbeat(self) -> dict[tuple[Hashable, ...], float]:
-------
``{(context, span_id, prefix, activity, unit): value}}``
See also
See Also
--------
SpansSchedulerExtension.heartbeat
Span.cumulative_worker_metrics
Expand Down
3 changes: 1 addition & 2 deletions distributed/tests/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
from dask import delayed

import distributed
from distributed import Client, Event, Future, Worker, wait
from distributed import Client, Event, Future, Worker, span, wait
from distributed.compatibility import WINDOWS
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.metrics import time
from distributed.spans import span
from distributed.utils_test import (
NoSchedulerDelayWorker,
async_poll_for,
Expand Down
Loading

0 comments on commit 8e6c287

Please sign in to comment.