diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 9b579cc1d5fdfe..fe44157a9ef249 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -70,6 +70,26 @@ Queue Return an item if one is immediately available, else raise :exc:`QueueEmpty`. + .. method:: iter() + + Return an :term:`asynchronous iterator` which iterates over the queue + of items. If queue is empty, wait until the next item is available. + + Stops iteration if the queue has been shut down and is empty, or if + the queue has been shut down immediately. + + .. versionadded:: 3.14 + + .. method:: iter_nowait() + + Return an :term:`iterator` which iterates over the queue of items + without blocking. + + Only iterate over the items which are immediately available, but raise + the :exc:`QueueEmpty` exception if none are. + + .. versionadded:: 3.14 + .. coroutinemethod:: join() Block until all items in the queue have been received and processed. @@ -191,16 +211,11 @@ concurrent tasks:: async def worker(name, queue): - while True: - # Get a "work item" out of the queue. - sleep_for = await queue.get() - + # Get a "work item" out of the queue. + async for sleep_for in queue.iter(): # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) - # Notify the queue that the "work item" has been processed. - queue.task_done() - print(f'{name} has slept for {sleep_for:.2f} seconds') @@ -215,22 +230,16 @@ concurrent tasks:: total_sleep_time += sleep_for queue.put_nowait(sleep_for) - # Create three worker tasks to process the queue concurrently. - tasks = [] - for i in range(3): - task = asyncio.create_task(worker(f'worker-{i}', queue)) - tasks.append(task) + # All tasks have been queued + queue.shutdown() - # Wait until the queue is fully processed. + # Create three worker tasks to process the queue concurrently. started_at = time.monotonic() - await queue.join() - total_slept_for = time.monotonic() - started_at + async with asyncio.TaskGroup() as tg: + for i in range(3): + tg.create_task(worker(f'worker-{i}', queue)) - # Cancel our worker tasks. - for task in tasks: - task.cancel() - # Wait until all worker tasks are cancelled. - await asyncio.gather(*tasks, return_exceptions=True) + total_slept_for = time.monotonic() - started_at print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index b357553735e8bb..c75af9edce2065 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -92,6 +92,12 @@ ast Added :func:`ast.compare` for comparing two ASTs. (Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.) +asyncio +------- + +Add :meth:`asyncio.Queue.iter` and :meth:`asyncio.Queue.iter_nowait`. +(Contributed by Wannes Boeykens in :gh:`120925`.) + os -- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 2f3865114a84f9..94767edc249543 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -30,6 +30,20 @@ class QueueShutDown(Exception): pass +class _AsyncQueueIterator: + def __init__(self, queue): + self._queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self._queue.get() + except QueueShutDown: + raise StopAsyncIteration + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -216,6 +230,33 @@ def get_nowait(self): self._wakeup_next(self._putters) return item + def iter(self): + """Return an asynchronous iterator which iterates over the queue of + items. If queue is empty, wait until the next item is available. + + Stops iteration if the queue has been shut down and is empty, or if the + queue has been shut down immediately. + """ + return _AsyncQueueIterator(self) + + def iter_nowait(self): + """Return an iterator which iterates over the queue of items without + blocking. + + Only iterate over the items which are immediately available, but raise + the QueueEmpty exception if none are. + """ + try: + yield self.get_nowait() + except QueueShutDown: + return + + try: + while True: + yield self.get_nowait() + except (QueueEmpty, QueueShutDown): + return + def task_done(self): """Indicate that a formerly enqueued task is complete. diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 5019e9a293525d..f22df5e54cb05f 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -116,6 +116,37 @@ async def putter(): self.assertTrue(t.done()) self.assertTrue(t.result()) + async def test_iter(self): + q = asyncio.Queue() + accumulator = 0 + + async def worker(): + nonlocal accumulator + + async for item in q.iter(): + accumulator += item + + async with asyncio.TaskGroup() as tg: + tg.create_task(worker()) + tg.create_task(worker()) + for i in range(100): + q.put_nowait(i) + + q.shutdown() + + self.assertEqual(sum(range(100)), accumulator) + + async def test_iter_nowait(self): + q = asyncio.Queue() + accumulator = 0 + for i in range(100): + q.put_nowait(i) + + for item in q.iter_nowait(): + accumulator += item + + self.assertEqual(sum(range(100)), accumulator) + class QueueGetTests(unittest.IsolatedAsyncioTestCase): @@ -167,6 +198,8 @@ def test_nonblocking_get(self): def test_nonblocking_get_exception(self): q = asyncio.Queue() self.assertRaises(asyncio.QueueEmpty, q.get_nowait) + with self.assertRaises(asyncio.QueueEmpty): + list(q.iter_nowait()) async def test_get_cancelled_race(self): q = asyncio.Queue() @@ -471,27 +504,22 @@ async def test_task_done(self): # Two workers get items from the queue and call task_done after each. # Join the queue and assert all items have been processed. - running = True async def worker(): nonlocal accumulator - while running: - item = await q.get() + async for item in q.iter(): accumulator += item q.task_done() async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(worker()) - for index in range(2)] - + tg.create_task(worker()) + tg.create_task(worker()) await q.join() self.assertEqual(sum(range(100)), accumulator) # close running generators - running = False - for i in range(len(tasks)): - q.put_nowait(0) + q.shutdown() async def test_join_empty_queue(self): q = self.q_class() @@ -566,6 +594,7 @@ async def test_shutdown_empty(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) async def test_shutdown_nonempty(self): # Test shutting down a non-empty queue @@ -610,6 +639,7 @@ async def test_shutdown_nonempty(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) # Ensure there is 1 unfinished task, and join() task succeeds q.task_done() @@ -652,6 +682,7 @@ async def test_shutdown_immediate(self): await q.get() with self.assertRaisesShutdown(): q.get_nowait() + self.assertEqual(list(q.iter_nowait()), []) # Ensure there are no unfinished tasks with self.assertRaises(