Skip to content

Commit a5cbab5

Browse files
authored
bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure (pythonGH-17670)
As reported initially by @rad-pat in python#6084, the following script causes a deadlock. ``` from concurrent.futures import ProcessPoolExecutor class ObjectWithPickleError(): """Triggers a RuntimeError when sending job to the workers""" def __reduce__(self): raise RuntimeError() if __name__ == "__main__": e = ProcessPoolExecutor() f = e.submit(id, ObjectWithPickleError()) e.shutdown(wait=False) f.result() # Deadlock on get ``` This is caused by the fact that the main process is closing communication channels that might be necessary to the `queue_management_thread` later. To avoid this, this PR let the `queue_management_thread` manage all the closing. https://bugs.python.org/issue39104 Automerge-Triggered-By: @pitrou
1 parent 1ed6161 commit a5cbab5

File tree

3 files changed

+100
-23
lines changed

3 files changed

+100
-23
lines changed

Lib/concurrent/futures/process.py

+26-21
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,23 @@
8080

8181
class _ThreadWakeup:
8282
def __init__(self):
83+
self._closed = False
8384
self._reader, self._writer = mp.Pipe(duplex=False)
8485

8586
def close(self):
86-
self._writer.close()
87-
self._reader.close()
87+
if not self._closed:
88+
self._closed = True
89+
self._writer.close()
90+
self._reader.close()
8891

8992
def wakeup(self):
90-
self._writer.send_bytes(b"")
93+
if not self._closed:
94+
self._writer.send_bytes(b"")
9195

9296
def clear(self):
93-
while self._reader.poll():
94-
self._reader.recv_bytes()
97+
if not self._closed:
98+
while self._reader.poll():
99+
self._reader.recv_bytes()
95100

96101

97102
def _python_exit():
@@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):
160165

161166
class _SafeQueue(Queue):
162167
"""Safe Queue set exception to the future object linked to a job"""
163-
def __init__(self, max_size=0, *, ctx, pending_work_items):
168+
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
164169
self.pending_work_items = pending_work_items
170+
self.thread_wakeup = thread_wakeup
165171
super().__init__(max_size, ctx=ctx)
166172

167173
def _on_queue_feeder_error(self, e, obj):
168174
if isinstance(obj, _CallItem):
169175
tb = traceback.format_exception(type(e), e, e.__traceback__)
170176
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
171177
work_item = self.pending_work_items.pop(obj.work_id, None)
178+
self.thread_wakeup.wakeup()
172179
# work_item can be None if another process terminated. In this case,
173180
# the queue_manager_thread fails all work_items with BrokenProcessPool
174181
if work_item is not None:
@@ -339,6 +346,8 @@ def shutdown_worker():
339346

340347
# Release the queue's resources as soon as possible.
341348
call_queue.close()
349+
call_queue.join_thread()
350+
thread_wakeup.close()
342351
# If .join() is not called on the created processes then
343352
# some ctx.Queue methods may deadlock on Mac OS X.
344353
for p in processes.values():
@@ -566,29 +575,30 @@ def __init__(self, max_workers=None, mp_context=None,
566575
self._pending_work_items = {}
567576
self._cancel_pending_futures = False
568577

578+
# _ThreadWakeup is a communication channel used to interrupt the wait
579+
# of the main loop of queue_manager_thread from another thread (e.g.
580+
# when calling executor.submit or executor.shutdown). We do not use the
581+
# _result_queue to send the wakeup signal to the queue_manager_thread
582+
# as it could result in a deadlock if a worker process dies with the
583+
# _result_queue write lock still acquired.
584+
self._queue_management_thread_wakeup = _ThreadWakeup()
585+
569586
# Create communication channels for the executor
570587
# Make the call queue slightly larger than the number of processes to
571588
# prevent the worker processes from idling. But don't make it too big
572589
# because futures in the call queue cannot be cancelled.
573590
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
574591
self._call_queue = _SafeQueue(
575592
max_size=queue_size, ctx=self._mp_context,
576-
pending_work_items=self._pending_work_items)
593+
pending_work_items=self._pending_work_items,
594+
thread_wakeup=self._queue_management_thread_wakeup)
577595
# Killed worker processes can produce spurious "broken pipe"
578596
# tracebacks in the queue's own worker thread. But we detect killed
579597
# processes anyway, so silence the tracebacks.
580598
self._call_queue._ignore_epipe = True
581599
self._result_queue = mp_context.SimpleQueue()
582600
self._work_ids = queue.Queue()
583601

584-
# _ThreadWakeup is a communication channel used to interrupt the wait
585-
# of the main loop of queue_manager_thread from another thread (e.g.
586-
# when calling executor.submit or executor.shutdown). We do not use the
587-
# _result_queue to send the wakeup signal to the queue_manager_thread
588-
# as it could result in a deadlock if a worker process dies with the
589-
# _result_queue write lock still acquired.
590-
self._queue_management_thread_wakeup = _ThreadWakeup()
591-
592602
def _start_queue_management_thread(self):
593603
if self._queue_management_thread is None:
594604
# When the executor gets garbarge collected, the weakref callback
@@ -692,16 +702,11 @@ def shutdown(self, wait=True, *, cancel_futures=False):
692702
# To reduce the risk of opening too many files, remove references to
693703
# objects that use file descriptors.
694704
self._queue_management_thread = None
695-
if self._call_queue is not None:
696-
self._call_queue.close()
697-
if wait:
698-
self._call_queue.join_thread()
699-
self._call_queue = None
705+
self._call_queue = None
700706
self._result_queue = None
701707
self._processes = None
702708

703709
if self._queue_management_thread_wakeup:
704-
self._queue_management_thread_wakeup.close()
705710
self._queue_management_thread_wakeup = None
706711

707712
shutdown.__doc__ = _base.Executor.shutdown.__doc__

Lib/test/test_concurrent_futures.py

+72-2
Original file line numberDiff line numberDiff line change
@@ -415,13 +415,32 @@ def test_context_manager_shutdown(self):
415415

416416
def test_del_shutdown(self):
417417
executor = futures.ThreadPoolExecutor(max_workers=5)
418-
executor.map(abs, range(-5, 5))
418+
res = executor.map(abs, range(-5, 5))
419419
threads = executor._threads
420420
del executor
421421

422422
for t in threads:
423423
t.join()
424424

425+
# Make sure the results were all computed before the
426+
# executor got shutdown.
427+
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
428+
429+
def test_shutdown_no_wait(self):
430+
# Ensure that the executor cleans up the threads when calling
431+
# shutdown with wait=False
432+
executor = futures.ThreadPoolExecutor(max_workers=5)
433+
res = executor.map(abs, range(-5, 5))
434+
threads = executor._threads
435+
executor.shutdown(wait=False)
436+
for t in threads:
437+
t.join()
438+
439+
# Make sure the results were all computed before the
440+
# executor got shutdown.
441+
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
442+
443+
425444
def test_thread_names_assigned(self):
426445
executor = futures.ThreadPoolExecutor(
427446
max_workers=5, thread_name_prefix='SpecialPool')
@@ -488,7 +507,7 @@ def test_context_manager_shutdown(self):
488507

489508
def test_del_shutdown(self):
490509
executor = futures.ProcessPoolExecutor(max_workers=5)
491-
list(executor.map(abs, range(-5, 5)))
510+
res = executor.map(abs, range(-5, 5))
492511
queue_management_thread = executor._queue_management_thread
493512
processes = executor._processes
494513
call_queue = executor._call_queue
@@ -502,6 +521,31 @@ def test_del_shutdown(self):
502521
p.join()
503522
call_queue.join_thread()
504523

524+
# Make sure the results were all computed before the
525+
# executor got shutdown.
526+
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
527+
528+
def test_shutdown_no_wait(self):
529+
# Ensure that the executor cleans up the processes when calling
530+
# shutdown with wait=False
531+
executor = futures.ProcessPoolExecutor(max_workers=5)
532+
res = executor.map(abs, range(-5, 5))
533+
processes = executor._processes
534+
call_queue = executor._call_queue
535+
queue_management_thread = executor._queue_management_thread
536+
executor.shutdown(wait=False)
537+
538+
# Make sure that all the executor resources were properly cleaned by
539+
# the shutdown process
540+
queue_management_thread.join()
541+
for p in processes.values():
542+
p.join()
543+
call_queue.join_thread()
544+
545+
# Make sure the results were all computed before the executor got
546+
# shutdown.
547+
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
548+
505549

506550
create_executor_tests(ProcessPoolShutdownTest,
507551
executor_mixins=(ProcessPoolForkMixin,
@@ -1086,6 +1130,32 @@ def test_shutdown_deadlock(self):
10861130
with self.assertRaises(BrokenProcessPool):
10871131
f.result()
10881132

1133+
def test_shutdown_deadlock_pickle(self):
1134+
# Test that the pool calling shutdown with wait=False does not cause
1135+
# a deadlock if a task fails at pickle after the shutdown call.
1136+
# Reported in bpo-39104.
1137+
self.executor.shutdown(wait=True)
1138+
with self.executor_type(max_workers=2,
1139+
mp_context=get_context(self.ctx)) as executor:
1140+
self.executor = executor # Allow clean up in fail_on_deadlock
1141+
1142+
# Start the executor and get the queue_management_thread to collect
1143+
# the threads and avoid dangling thread that should be cleaned up
1144+
# asynchronously.
1145+
executor.submit(id, 42).result()
1146+
queue_manager = executor._queue_management_thread
1147+
1148+
# Submit a task that fails at pickle and shutdown the executor
1149+
# without waiting
1150+
f = executor.submit(id, ErrorAtPickle())
1151+
executor.shutdown(wait=False)
1152+
with self.assertRaises(PicklingError):
1153+
f.result()
1154+
1155+
# Make sure the executor is eventually shutdown and do not leave
1156+
# dangling threads
1157+
queue_manager.join()
1158+
10891159

10901160
create_executor_tests(ExecutorDeadlockTest,
10911161
executor_mixins=(ProcessPoolForkMixin,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Fix hanging ProcessPoolExcutor on ``shutdown(wait=False)`` when a task has
2+
failed pickling.

0 commit comments

Comments
 (0)