Skip to content

Commit

Permalink
Fix test_map and others (#5278)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Aug 27, 2021
1 parent eedbd4b commit f11a90c
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 17 deletions.
3 changes: 2 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2181,7 +2181,7 @@ def _transition(self, key, finish: str, *args, **kwargs):
self._transition_counter += 1
recommendations, client_msgs, worker_msgs = a
elif "released" not in start_finish:
assert not args and not kwargs, (args, kwargs)
assert not args and not kwargs, (args, kwargs, start_finish)
a_recs: dict
a_cmsgs: dict
a_wmsgs: dict
Expand Down Expand Up @@ -7863,6 +7863,7 @@ def _task_to_msg(state: SchedulerState, ts: TaskState, duration: double = -1) ->

if ts._annotations:
msg["annotations"] = ts._annotations

return msg


Expand Down
2 changes: 1 addition & 1 deletion distributed/stealing.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ async def move_task_confirm(self, key=None, worker=None, state=None):
await self.scheduler.remove_worker(thief.address)
self.log(("confirm", key, victim.address, thief.address))
else:
raise ValueError("Unexpected task state: %s" % state)
raise ValueError(f"Unexpected task state: {state}")
except Exception as e:
logger.exception(e)
if LOG_PDB:
Expand Down
3 changes: 2 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5149,7 +5149,8 @@ def f():

@gen_cluster(client=True)
async def test_secede_balances(c, s, a, b):
count = threading.active_count()
"""Ensure that tasks scheduled from a seceded thread can be scheduled
elsewhere"""

def f(x):
client = get_client()
Expand Down
7 changes: 4 additions & 3 deletions distributed/tests/test_client_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def test_cancellation_as_completed(client):
assert n_cancelled == 2


@pytest.mark.slow()
def test_map(client):
with client.get_executor() as e:
N = 10
Expand All @@ -137,7 +138,7 @@ def test_map(client):

with client.get_executor(pure=False) as e:
N = 10
it = e.map(slowinc, range(N), [0.1] * N, timeout=0.4)
it = e.map(slowinc, range(N), [0.3] * N, timeout=1.2)
results = []
with pytest.raises(TimeoutError):
for x in it:
Expand All @@ -147,14 +148,14 @@ def test_map(client):
with client.get_executor(pure=False) as e:
N = 10
# Not consuming the iterator will cancel remaining tasks
it = e.map(slowinc, range(N), [0.1] * N)
it = e.map(slowinc, range(N), [0.3] * N)
for x in take(2, it):
pass
# Some tasks still processing
assert number_of_processing_tasks(client) > 0
# Garbage collect the iterator => remaining tasks are cancelled
del it
time.sleep(0.05)
time.sleep(0.5)
assert number_of_processing_tasks(client) == 0


Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ async def test_failure_during_worker_initialization(s):
assert "Restarting worker" not in logs.getvalue()


@gen_cluster(client=True, Worker=Nanny, timeout=10000000)
@gen_cluster(client=True, Worker=Nanny)
async def test_environ_plugin(c, s, a, b):
from dask.distributed import Environ

Expand Down
25 changes: 15 additions & 10 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import logging
import random
import sys
import weakref
from operator import mul
from time import sleep
Expand All @@ -12,7 +13,7 @@
import dask

from distributed import Nanny, Worker, wait, worker_client
from distributed.compatibility import LINUX
from distributed.compatibility import LINUX, WINDOWS
from distributed.config import config
from distributed.metrics import time
from distributed.scheduler import key_split
Expand Down Expand Up @@ -152,7 +153,7 @@ def do_nothing(x, y=None):
xs = c.map(do_nothing, range(10), workers=workers[0].address)
await wait(xs)

futures = c.map(do_nothing, range(1000), y=xs)
futures = c.map(do_nothing, range(100), y=xs)

await wait(futures)

Expand Down Expand Up @@ -362,10 +363,8 @@ async def test_steal_resource_restrictions(c, s, a):

b = await Worker(s.address, loop=s.loop, nthreads=1, resources={"A": 4})

start = time()
while not b.tasks or len(a.tasks) == 101:
await asyncio.sleep(0.01)
assert time() < start + 3

assert len(b.tasks) > 0
assert len(a.tasks) < 101
Expand Down Expand Up @@ -645,6 +644,12 @@ async def test_steal_communication_heavy_tasks(c, s, a, b):
assert s.processing[b.address]


@pytest.mark.flaky(
condition=WINDOWS and sys.version_info[:2] == (3, 7),
reruns=20,
reruns_delay=5,
reason="b.in_flight_tasks == 1",
)
@gen_cluster(client=True)
async def test_steal_twice(c, s, a, b):
x = c.submit(inc, 1, workers=a.address)
Expand Down Expand Up @@ -680,8 +685,9 @@ async def test_steal_twice(c, s, a, b):
async def test_dont_steal_already_released(c, s, a, b):
future = c.submit(slowinc, 1, delay=0.05, workers=a.address)
key = future.key
await asyncio.sleep(0.05)
assert key in a.tasks
while key not in a.tasks:
await asyncio.sleep(0.05)

del future
await asyncio.sleep(0.05)
# In case the system is slow (e.g. network) ensure that nothing bad happens
Expand All @@ -694,10 +700,9 @@ async def test_dont_steal_already_released(c, s, a, b):
with captured_logger(
logging.getLogger("distributed.stealing"), level=logging.DEBUG
) as stealing_logs:
await asyncio.sleep(0.05)

logs = stealing_logs.getvalue()
assert f"Key released between request and confirm: {key}" in logs
msg = f"Key released between request and confirm: {key}"
while msg not in stealing_logs.getvalue():
await asyncio.sleep(0.05)


@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 2)
Expand Down
10 changes: 10 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2409,6 +2409,11 @@ async def test_hold_on_to_replicas(c, s, *workers):
await asyncio.sleep(0.01)


@pytest.mark.flaky(
condition=WINDOWS and sys.version_info[:2] == (3, 8),
reruns=20,
reruns_delay=5,
)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute(c, s, a, b):
"""Ensure that, if a worker disconnects while computing a result, the scheduler will
Expand Down Expand Up @@ -2474,6 +2479,11 @@ def fast_on_a(lock):
await asyncio.sleep(0.001)


@pytest.mark.flaky(
condition=WINDOWS and sys.version_info[:2] == (3, 8),
reruns=20,
reruns_delay=5,
)
@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute_multiple_states_on_scheduler(c, s, a, b):
"""
Expand Down

0 comments on commit f11a90c

Please sign in to comment.