Skip to content

Commit

Permalink
add pragma - worker.py, client.py, stealing.py (#5827)
Browse files Browse the repository at this point in the history
  • Loading branch information
scharlottej13 authored Feb 23, 2022
1 parent 158a3e7 commit 8a99ac7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 16 deletions.
12 changes: 6 additions & 6 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def _del_global_client(c: Client) -> None:
try:
if _global_clients[k] is c:
del _global_clients[k]
except KeyError:
except KeyError: # pragma: no cover
pass


Expand Down Expand Up @@ -444,7 +444,7 @@ def release(self):
self._cleared = True
try:
self.client.loop.add_callback(self.client._dec_ref, stringify(self.key))
except TypeError:
except TypeError: # pragma: no cover
pass # Shutting down, add_callback may be None

def __getstate__(self):
Expand Down Expand Up @@ -848,7 +848,7 @@ def __init__(
elif security is True:
security = Security.temporary()
self._startup_kwargs["security"] = security
elif not isinstance(security, Security):
elif not isinstance(security, Security): # pragma: no cover
raise TypeError("security must be a Security object")

self.security = security
Expand Down Expand Up @@ -2021,7 +2021,7 @@ async def wait(k):
if errors == "skip":
bad_keys.add(key)
bad_data[key] = None
else:
else: # pragma: no cover
raise ValueError("Bad value, `errors=%s`" % errors)

keys = [k for k in keys if k not in bad_keys and k not in data]
Expand Down Expand Up @@ -2061,7 +2061,7 @@ async def wait(k):
self.futures[key].reset()
except KeyError: # TODO: verify that this is safe
pass
else:
else: # pragma: no cover
break

if bad_data and errors == "skip" and isinstance(unpacked, list):
Expand Down Expand Up @@ -2237,7 +2237,7 @@ async def _scatter(
raise TimeoutError("No valid workers found")
# Exclude paused and closing_gracefully workers
nthreads = await self.scheduler.ncores_running(workers=workers)
if not nthreads:
if not nthreads: # pragma: no cover
raise ValueError("No valid workers found")

_, who_has, nbytes = await scatter_to_workers(
Expand Down
4 changes: 2 additions & 2 deletions distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def move_task_request(self, ts, victim, thief) -> str:
except CommClosedError:
logger.info("Worker comm %r closed while stealing: %r", victim, ts)
return "comm-closed"
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down Expand Up @@ -338,7 +338,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None):
self.log(("confirm", *_log_msg))
else:
raise ValueError(f"Unexpected task state: {state}")
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down
16 changes: 8 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,7 +1320,7 @@ async def _register_with_scheduler(self):
except OSError:
logger.info("Waiting to connect to: %26s", self.scheduler.address)
await asyncio.sleep(0.1)
except TimeoutError:
except TimeoutError: # pragma: no cover
logger.info("Timed out when connecting to scheduler")
if response["status"] != "OK":
raise ValueError(f"Unexpected response from register: {response!r}")
Expand Down Expand Up @@ -1695,7 +1695,7 @@ async def close(
# before closing self.batched_stream, otherwise the local endpoint
# may be closed too early and errors be raised on the scheduler when
# trying to send closing message.
if self._protocol == "ucx":
if self._protocol == "ucx": # pragma: no cover
await asyncio.sleep(0.2)

if (
Expand Down Expand Up @@ -2123,7 +2123,7 @@ def handle_compute_task(
"error",
}:
recommendations[ts] = "waiting"
else:
else: # pragma: no cover
raise RuntimeError(f"Unexpected task state encountered {ts} {stimulus_id}")

for msg in scheduler_msgs:
Expand Down Expand Up @@ -3239,7 +3239,7 @@ def update_who_has(self, who_has: dict[str, Collection[str]]) -> None:
for worker in workers:
self.has_what[worker].add(dep)
self.pending_data_per_worker[worker].push(dep_ts)
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down Expand Up @@ -3345,7 +3345,7 @@ def release_key(
except CommClosedError:
# Batched stream send might raise if it was already closed
pass
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down Expand Up @@ -3518,7 +3518,7 @@ def ensure_computing(self):
self.transition(ts, "memory", stimulus_id=stimulus_id)
elif ts.state in READY:
self.transition(ts, "executing", stimulus_id=stimulus_id)
except Exception as e:
except Exception as e: # pragma: no cover
logger.exception(e)
if LOG_PDB:
import pdb
Expand Down Expand Up @@ -4403,7 +4403,7 @@ async def _get_data():
)
try:
status = response["status"]
except KeyError:
except KeyError: # pragma: no cover
raise ValueError("Unexpected response", response)
else:
if status == "OK":
Expand Down Expand Up @@ -4825,7 +4825,7 @@ def warn(*args, **kwargs):
"""
try:
worker = get_worker()
except ValueError:
except ValueError: # pragma: no cover
pass
else:
worker.log_event("warn", {"args": args, "kwargs": kwargs})
Expand Down

0 comments on commit 8a99ac7

Please sign in to comment.