Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,36 @@ provide the public methods described below.
will not block. Similarly, if full() returns ``False`` it doesn't
guarantee that a subsequent call to put() will not block.

.. method:: Queue.__iter__()

Return an :term:`iterator` which iterates over the queue of items until
:meth:`shutdown` is called and continues iteration until the queue is
empty.

Example::

import concurrent.futures
import queue
import time

def worker(name, q):
for item in q:
time.sleep(.01)
print(f'{name} finished {item}')

q = queue.Queue()
for item in range(30):
q.put(item)

q.shutdown()
with concurrent.futures.ThreadPoolExecutor() as tp:
for i in range(3):
tp.submit(worker, f'worker-{i}', q)

print('All work completed')

.. versionadded:: 3.14


.. method:: Queue.put(item, block=True, timeout=None)

Expand Down Expand Up @@ -212,8 +242,7 @@ Example of how to wait for enqueued tasks to be completed::
q = queue.Queue()

def worker():
while True:
item = q.get()
for item in q:
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ pathlib
another, like :func:`shutil.copyfile`.
(Contributed by Barney Gale in :gh:`73991`.)

queue
-----

Made :class:`queue.Queue` an :term:`iterable`.
(Contributed by Wannes Boeykens in :gh:`120503`.)

symtable
--------

Expand Down
7 changes: 7 additions & 0 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,13 @@ def _put(self, item):
def _get(self):
return self.queue.popleft()

def __iter__(self):
try:
while True:
yield self.get()
except ShutDown:
return

__class_getitem__ = classmethod(types.GenericAlias)


Expand Down
85 changes: 52 additions & 33 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import concurrent.futures
import itertools
import random
import threading
Expand Down Expand Up @@ -150,34 +151,52 @@ def basic_queue_test(self, q):
self.do_blocking_test(q.get, (), q.put, ('empty',))
self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))

def test_iter(self):
q = self.type2test()
for i in range(100):
q.put(i)

def worker(self, q):
while True:
x = q.get()
if x < 0:
q.task_done()
return
with self.cumlock:
self.cum += x
q.task_done()
q.shutdown()
self.cum = 0

def worker():
for x in q:
with self.cumlock:
self.cum += x

with concurrent.futures.ThreadPoolExecutor() as tp:
tp.submit(worker)
tp.submit(worker)

self.assertEqual(self.cum, sum(range(100)))

def queue_join_test(self, q):
self.cum = 0
threads = []
for i in (0,1):
thread = threading.Thread(target=self.worker, args=(q,))
thread.start()
threads.append(thread)
for i in range(100):
q.put(i)
q.join()
self.assertEqual(self.cum, sum(range(100)),
"q.join() did not block until all tasks were done")
for i in (0,1):
q.put(-1) # instruct the threads to close
q.join() # verify that you can join twice
for thread in threads:
thread.join()

def worker():
for x in q:
with self.cumlock:
self.cum += x
q.task_done()

with concurrent.futures.ThreadPoolExecutor() as tp:
tp.submit(worker)
tp.submit(worker)
for i in range(100):
q.put(i)

q.join()
self.assertEqual(self.cum, sum(range(100)),
"q.join() didn't block until all tasks were done")
for i in range(100, 200):
q.put(i)

q.join() # verify that you can join twice
self.assertEqual(self.cum, sum(range(200)),
"q.join() didn't block until all tasks were done")

# instruct the threads to close
q.shutdown()

def test_queue_task_done(self):
# Test to make sure a queue task completed successfully.
Expand All @@ -192,15 +211,15 @@ def test_queue_task_done(self):
def test_queue_join(self):
# Test that a queue join()s successfully, and before anything else
# (done twice for insurance).
q = self.type2test()
self.queue_join_test(q)
self.queue_join_test(q)
try:
q.task_done()
except ValueError:
pass
else:
self.fail("Did not detect task count going negative")
for _ in range(2):
q = self.type2test()
self.queue_join_test(q)
try:
q.task_done()
except ValueError:
pass
else:
self.fail("Did not detect task count going negative")

def test_basic(self):
# Do it a couple of times on the same queue.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Made :class:`queue.Queue` an :term:`iterable`.
(Contributed by Wannes Boeykens in :gh:`120503`.)