Skip to content

Commit

Permalink
Use name argument with Scheduler.remove_plugin calls (#5260)
Browse files Browse the repository at this point in the history
  • Loading branch information
douglasdavis authored Aug 26, 2021
1 parent 332f785 commit eedbd4b
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 13 deletions.
2 changes: 1 addition & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,7 +2113,7 @@ def patch_updates(self):
self.edge_source.patch({"visible": updates})

def __del__(self):
self.scheduler.remove_plugin(self.layout)
self.scheduler.remove_plugin(name=self.layout.name)


class TaskGroupGraph(DashboardComponent):
Expand Down
2 changes: 1 addition & 1 deletion distributed/diagnostics/eventstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def swap_buffer(scheduler, es):


def teardown(scheduler, es):
scheduler.remove_plugin(es)
scheduler.remove_plugin(name=es.name)


async def eventstream(address, interval):
Expand Down
2 changes: 2 additions & 0 deletions distributed/diagnostics/progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ def format_time(t):
class AllProgress(SchedulerPlugin):
"""Keep track of all keys, grouped by key_split"""

name = "all-progress"

def __init__(self, scheduler):
self.all = defaultdict(set)
self.nbytes = defaultdict(lambda: 0)
Expand Down
7 changes: 4 additions & 3 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
from functools import partial

from tlz import merge, valmap

Expand All @@ -21,10 +22,10 @@ def counts(scheduler, allprogress):
)


def remove_plugin(*args, **kwargs):
def remove_plugin(**kwargs):
# Wrapper function around `Scheduler.remove_plugin` to avoid raising a
# `PicklingError` when using a cythonized scheduler
return Scheduler.remove_plugin(*args, **kwargs)
return Scheduler.remove_plugin(**kwargs)


async def progress_stream(address, interval):
Expand Down Expand Up @@ -53,7 +54,7 @@ async def progress_stream(address, interval):
"setup": dumps_function(AllProgress),
"function": dumps_function(counts),
"interval": interval,
"teardown": dumps_function(remove_plugin),
"teardown": dumps_function(partial(remove_plugin, name=AllProgress.name)),
}
)
return comm
Expand Down
9 changes: 5 additions & 4 deletions distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def remove_worker(self, worker, scheduler):
]

events[:] = []
s.remove_plugin(plugin)
s.remove_plugin(name=plugin.name)
a = await Worker(s.address)
await a.close()
assert events == []
Expand Down Expand Up @@ -104,7 +104,7 @@ async def remove_worker(self, worker, scheduler):
}

events[:] = []
s.remove_plugin(plugin)
s.remove_plugin(name=plugin.name)
async with Worker(s.address):
pass
assert events == []
Expand All @@ -116,8 +116,9 @@ async def start(self, scheduler):
plugin = UnnamedPlugin()
s.add_plugin(plugin)
s.add_plugin(plugin, name="another")
with pytest.raises(ValueError) as excinfo:
s.remove_plugin(plugin)
with pytest.warns(FutureWarning, match="Removing scheduler plugins by value"):
with pytest.raises(ValueError) as excinfo:
s.remove_plugin(plugin)

msg = str(excinfo.value)
assert "Multiple instances of" in msg
Expand Down
3 changes: 3 additions & 0 deletions distributed/diagnostics/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@


class WebsocketPlugin(SchedulerPlugin):

name = "websocket"

def __init__(self, socket, scheduler):
self.socket = socket
self.scheduler = scheduler
Expand Down
2 changes: 1 addition & 1 deletion distributed/http/scheduler/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def on_message(self, message):
self.send("pong", {"timestamp": str(datetime.now())})

def on_close(self):
self.server.remove_plugin(self.plugin)
self.server.remove_plugin(name=self.plugin.name)


routes = [
Expand Down
8 changes: 5 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7037,7 +7037,7 @@ def stop_task_metadata(self, comm=None, name=None):
)

plugin = plugins[0]
self.remove_plugin(plugin)
self.remove_plugin(name=plugin.name)
return {"metadata": plugin.metadata, "state": plugin.state}

async def register_worker_plugin(self, comm, plugin, name=None):
Expand Down Expand Up @@ -8150,6 +8150,8 @@ class WorkerStatusPlugin(SchedulerPlugin):
scheduler.
"""

name = "worker-status"

def __init__(self, scheduler, comm):
self.bcomm = BatchedSend(interval="5ms")
self.bcomm.start(comm)
Expand All @@ -8164,13 +8166,13 @@ def add_worker(self, worker=None, **kwargs):
try:
self.bcomm.send(["add", {"workers": {worker: ident}}])
except CommClosedError:
self.scheduler.remove_plugin(self)
self.scheduler.remove_plugin(name=self.name)

def remove_worker(self, worker=None, **kwargs):
try:
self.bcomm.send(["remove", worker])
except CommClosedError:
self.scheduler.remove_plugin(self)
self.scheduler.remove_plugin(name=self.name)

def teardown(self):
self.bcomm.close()
Expand Down

0 comments on commit eedbd4b

Please sign in to comment.