diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 9b579cc1d5fdfe..fe44157a9ef249 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -70,6 +70,26 @@ Queue Return an item if one is immediately available, else raise :exc:`QueueEmpty`. + .. method:: iter() + + Return an :term:`asynchronous iterator` which iterates over the queue + of items. If queue is empty, wait until the next item is available. + + Stops iteration if the queue has been shut down and is empty, or if + the queue has been shut down immediately. + + .. versionadded:: 3.14 + + .. method:: iter_nowait() + + Return an :term:`iterator` which iterates over the queue of items + without blocking. + + Only iterate over the items which are immediately available, but raise + the :exc:`QueueEmpty` exception if none are. + + .. versionadded:: 3.14 + .. coroutinemethod:: join() Block until all items in the queue have been received and processed. @@ -191,16 +211,11 @@ concurrent tasks:: async def worker(name, queue): - while True: - # Get a "work item" out of the queue. - sleep_for = await queue.get() - + # Get a "work item" out of the queue. + async for sleep_for in queue.iter(): # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) - # Notify the queue that the "work item" has been processed. - queue.task_done() - print(f'{name} has slept for {sleep_for:.2f} seconds') @@ -215,22 +230,16 @@ concurrent tasks:: total_sleep_time += sleep_for queue.put_nowait(sleep_for) - # Create three worker tasks to process the queue concurrently. - tasks = [] - for i in range(3): - task = asyncio.create_task(worker(f'worker-{i}', queue)) - tasks.append(task) + # All tasks have been queued + queue.shutdown() - # Wait until the queue is fully processed. + # Create three worker tasks to process the queue concurrently. started_at = time.monotonic() - await queue.join() - total_slept_for = time.monotonic() - started_at + async with asyncio.TaskGroup() as tg: + for i in range(3): + tg.create_task(worker(f'worker-{i}', queue)) - # Cancel our worker tasks. - for task in tasks: - task.cancel() - # Wait until all worker tasks are cancelled. - await asyncio.gather(*tasks, return_exceptions=True) + total_slept_for = time.monotonic() - started_at print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index fbbebcf4ed8f92..d25d7b24c36ff2 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -132,6 +132,49 @@ provide the public methods described below. will not block. Similarly, if full() returns ``False`` it doesn't guarantee that a subsequent call to put() will not block. +.. method:: Queue.iter(block=True, timeout=None) + + Return an :term:`iterator` which iterates over the queue of items. If optional + args *block* is true and *timeout* is ``None`` (the default), block if necessary + until the next item is available. If *timeout* is a positive number, it blocks + at most *timeout* seconds and stops iteration if no item was available within that + time, but if the first item is not available the :exc:`Empty` exception is raised. + Otherwise (*block* is false), iterate over all item which are immediately + available, but raise the :exc:`Empty` exception if none are (*timeout* is ignored + in that case). + + Stops iteration if the queue has been shut down and is empty, or if the queue has + been shut down immediately. + + Example:: + + import concurrent.futures + import queue + import time + + def worker(name, q): + for item in q.iter(): + time.sleep(.01) + print(f'{name} finished {item}') + + q = queue.Queue() + for item in range(30): + q.put(item) + + q.shutdown() + with concurrent.futures.ThreadPoolExecutor() as tp: + for i in range(3): + tp.submit(worker, f'worker-{i}', q) + + print('All work completed') + + .. versionadded:: 3.14 + +.. method:: Queue.iter_nowait() + + Equivalent to ``iter(False)``. + + .. versionadded:: 3.14 .. method:: Queue.put(item, block=True, timeout=None) @@ -212,8 +255,7 @@ Example of how to wait for enqueued tasks to be completed:: q = queue.Queue() def worker(): - while True: - item = q.get() + for item in q: print(f'Working on {item}') print(f'Finished {item}') q.task_done() diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 804d39ab64646d..61ec8519b9d3c7 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -92,6 +92,12 @@ ast Added :func:`ast.compare` for comparing two ASTs. (Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.) +asyncio +------- + +Add :meth:`asyncio.Queue.iter` and :meth:`asyncio.Queue.iter_nowait`. +(Contributed by Wannes Boeykens in :gh:`119154`.) + os -- @@ -107,6 +113,12 @@ pathlib another, like :func:`shutil.copyfile`. (Contributed by Barney Gale in :gh:`73991`.) +queue +----- + +Add :meth:`queue.Queue.iter` and :meth:`queue.Queue.iter_nowait`. +(Contributed by Wannes Boeykens in :gh:`119154`.) + symtable -------- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 2f3865114a84f9..94767edc249543 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -30,6 +30,20 @@ class QueueShutDown(Exception): pass +class _AsyncQueueIterator: + def __init__(self, queue): + self._queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self._queue.get() + except QueueShutDown: + raise StopAsyncIteration + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -216,6 +230,33 @@ def get_nowait(self): self._wakeup_next(self._putters) return item + def iter(self): + """Return an asynchronous iterator which iterates over the queue of + items. If queue is empty, wait until the next item is available. + + Stops iteration if the queue has been shut down and is empty, or if the + queue has been shut down immediately. + """ + return _AsyncQueueIterator(self) + + def iter_nowait(self): + """Return an iterator which iterates over the queue of items without + blocking. + + Only iterate over the items which are immediately available, but raise + the QueueEmpty exception if none are. + """ + try: + yield self.get_nowait() + except QueueShutDown: + return + + try: + while True: + yield self.get_nowait() + except (QueueEmpty, QueueShutDown): + return + def task_done(self): """Indicate that a formerly enqueued task is complete. diff --git a/Lib/queue.py b/Lib/queue.py index 25beb46e30d6bd..afab1303415e6f 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -275,6 +275,41 @@ def _put(self, item): def _get(self): return self.queue.popleft() + def iter(self, block=True, timeout=None): + '''Return an iterator which iterates over the queue of items. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until the next item is available. If 'timeout' is a + positive number, it blocks at most 'timeout' seconds and stops + iteration if no item was available within that time, but if the first + item is not available the Empty exception is raised. Otherwise ('block' + is false), iterate over all item which are immediately available, but + raise the Empty exception if none are ('timeout' is ignored in that + case). + + Stops iteration if the queue has been shut down and is empty, or if the + queue has been shut down immediately. + ''' + try: + yield self.get(block=block, timeout=timeout) + except ShutDown: + return + + try: + while True: + yield self.get(block=block, timeout=timeout) + except (Empty, ShutDown): + return + + def iter_nowait(self): + '''Return an iterator which iterates over the queue of items without + blocking. + + Only iterate over the items which are immediately available, but raise + the Empty exception if none are. + ''' + return self.iter(block=False) + __class_getitem__ = classmethod(types.GenericAlias) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 5019e9a293525d..f22df5e54cb05f 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -116,6 +116,37 @@ async def putter(): self.assertTrue(t.done()) self.assertTrue(t.result()) + async def test_iter(self): + q = asyncio.Queue() + accumulator = 0 + + async def worker(): + nonlocal accumulator + + async for item in q.iter(): + accumulator += item + + async with asyncio.TaskGroup() as tg: + tg.create_task(worker()) + tg.create_task(worker()) + for i in range(100): + q.put_nowait(i) + + q.shutdown() + + self.assertEqual(sum(range(100)), accumulator) + + async def test_iter_nowait(self): + q = asyncio.Queue() + accumulator = 0 + for i in range(100): + q.put_nowait(i) + + for item in q.iter_nowait(): + accumulator += item + + self.assertEqual(sum(range(100)), accumulator) + class QueueGetTests(unittest.IsolatedAsyncioTestCase): @@ -167,6 +198,8 @@ def test_nonblocking_get(self): def test_nonblocking_get_exception(self): q = asyncio.Queue() self.assertRaises(asyncio.QueueEmpty, q.get_nowait) + with self.assertRaises(asyncio.QueueEmpty): + list(q.iter_nowait()) async def test_get_cancelled_race(self): q = asyncio.Queue() @@ -471,27 +504,22 @@ async def test_task_done(self): # Two workers get items from the queue and call task_done after each. # Join the queue and assert all items have been processed. - running = True async def worker(): nonlocal accumulator - while running: - item = await q.get() + async for item in q.iter(): accumulator += item q.task_done() async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(worker()) - for index in range(2)] - + tg.create_task(worker()) + tg.create_task(worker()) await q.join() self.assertEqual(sum(range(100)), accumulator) # close running generators - running = False - for i in range(len(tasks)): - q.put_nowait(0) + q.shutdown() async def test_join_empty_queue(self): q = self.q_class() @@ -566,6 +594,7 @@ async def test_shutdown_empty(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) async def test_shutdown_nonempty(self): # Test shutting down a non-empty queue @@ -610,6 +639,7 @@ async def test_shutdown_nonempty(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) # Ensure there is 1 unfinished task, and join() task succeeds q.task_done() @@ -652,6 +682,7 @@ async def test_shutdown_immediate(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) # Ensure there are no unfinished tasks with self.assertRaises( diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 6dced7df0064d7..cf2e59ba4eb83e 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -1,5 +1,6 @@ # Some simple queue module tests, plus some failure conditions # to ensure the Queue locks remain stable. +import concurrent.futures import itertools import random import threading @@ -150,34 +151,63 @@ def basic_queue_test(self, q): self.do_blocking_test(q.get, (), q.put, ('empty',)) self.do_blocking_test(q.get, (True, 10), q.put, ('empty',)) + def test_iter(self): + q = self.type2test() + self.cum = 0 - def worker(self, q): - while True: - x = q.get() - if x < 0: - q.task_done() - return - with self.cumlock: - self.cum += x - q.task_done() + def worker(): + for x in q.iter(): + with self.cumlock: + self.cum += x - def queue_join_test(self, q): + with concurrent.futures.ThreadPoolExecutor() as tp: + tp.submit(worker) + tp.submit(worker) + for i in range(100): + q.put(i) + + q.shutdown() + + self.assertEqual(self.cum, sum(range(100))) + + def test_iter_nowait(self): + q = self.type2test() self.cum = 0 - threads = [] - for i in (0,1): - thread = threading.Thread(target=self.worker, args=(q,)) - thread.start() - threads.append(thread) for i in range(100): q.put(i) - q.join() - self.assertEqual(self.cum, sum(range(100)), - "q.join() did not block until all tasks were done") - for i in (0,1): - q.put(-1) # instruct the threads to close - q.join() # verify that you can join twice - for thread in threads: - thread.join() + + for x in q.iter_nowait(): + self.cum += x + + self.assertEqual(self.cum, sum(range(100))) + + def queue_join_test(self, q): + self.cum = 0 + + def worker(): + for x in q.iter(): + with self.cumlock: + self.cum += x + q.task_done() + + with concurrent.futures.ThreadPoolExecutor() as tp: + tp.submit(worker) + tp.submit(worker) + for i in range(100): + q.put(i) + + q.join() + self.assertEqual(self.cum, sum(range(100)), + "q.join() didn't block until all tasks were done") + for i in range(100, 200): + q.put(i) + + q.join() # verify that you can join twice + self.assertEqual(self.cum, sum(range(200)), + "q.join() didn't block until all tasks were done") + + # instruct the threads to close + q.shutdown() def test_queue_task_done(self): # Test to make sure a queue task completed successfully. @@ -192,15 +222,15 @@ def test_queue_task_done(self): def test_queue_join(self): # Test that a queue join()s successfully, and before anything else # (done twice for insurance). - q = self.type2test() - self.queue_join_test(q) - self.queue_join_test(q) - try: - q.task_done() - except ValueError: - pass - else: - self.fail("Did not detect task count going negative") + for _ in range(2): + q = self.type2test() + self.queue_join_test(q) + try: + q.task_done() + except ValueError: + pass + else: + self.fail("Did not detect task count going negative") def test_basic(self): # Do it a couple of times on the same queue. @@ -215,6 +245,8 @@ def test_negative_timeout_raises_exception(self): q.put(1, timeout=-1) with self.assertRaises(ValueError): q.get(1, timeout=-1) + with self.assertRaises(ValueError): + list(q.iter(1, timeout=-1)) def test_nowait(self): q = self.type2test(QUEUE_SIZE) @@ -227,6 +259,8 @@ def test_nowait(self): q.get_nowait() with self.assertRaises(self.queue.Empty): q.get_nowait() + with self.assertRaises(self.queue.Empty): + list(q.iter_nowait()) def test_shrinking_queue(self): # issue 10110 @@ -248,6 +282,7 @@ def test_shutdown_empty(self): q.put("data") with self.assertRaises(self.queue.ShutDown): q.get() + self.assertEqual(list(q.iter()), []) def test_shutdown_nonempty(self): q = self.type2test() @@ -256,6 +291,7 @@ def test_shutdown_nonempty(self): q.get() with self.assertRaises(self.queue.ShutDown): q.get() + self.assertEqual(list(q.iter()), []) def test_shutdown_immediate(self): q = self.type2test() @@ -263,6 +299,7 @@ def test_shutdown_immediate(self): q.shutdown(immediate=True) with self.assertRaises(self.queue.ShutDown): q.get() + self.assertEqual(list(q.iter()), []) def test_shutdown_allowed_transitions(self): # allowed transitions would be from alive via shutdown to immediate diff --git a/Misc/NEWS.d/next/Library/2024-06-23-19-35-14.gh-issue-119154.7CXFaX.rst b/Misc/NEWS.d/next/Library/2024-06-23-19-35-14.gh-issue-119154.7CXFaX.rst new file mode 100644 index 00000000000000..a5c571572c5de6 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-06-23-19-35-14.gh-issue-119154.7CXFaX.rst @@ -0,0 +1,3 @@ +Add :meth:`asyncio.Queue.iter` and :meth:`asyncio.Queue.iter_nowait`. +Add :meth:`queue.Queue.iter` and :meth:`queue.Queue.iter_nowait`. +Contributed by Wannes Boeykens.