diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 9b579cc1d5fdfe..62c7bac734bd4a 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -46,6 +46,16 @@ Queue Number of items allowed in the queue. + .. method:: __aiter__() + + Return an :term:`asynchronous iterator` which iterates over + the queue of items until :meth:`shutdown` is called and + continues iteration until the queue is empty. + + ``shutdown(immediate=True)`` stops iteration immediately. + + .. versionadded:: 3.14 + .. method:: empty() Return ``True`` if the queue is empty, ``False`` otherwise. @@ -191,16 +201,11 @@ concurrent tasks:: async def worker(name, queue): - while True: - # Get a "work item" out of the queue. - sleep_for = await queue.get() - + # Get a "work item" out of the queue. + async for sleep_for in queue: # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) - # Notify the queue that the "work item" has been processed. - queue.task_done() - print(f'{name} has slept for {sleep_for:.2f} seconds') @@ -215,22 +220,16 @@ concurrent tasks:: total_sleep_time += sleep_for queue.put_nowait(sleep_for) - # Create three worker tasks to process the queue concurrently. - tasks = [] - for i in range(3): - task = asyncio.create_task(worker(f'worker-{i}', queue)) - tasks.append(task) + # All tasks have been queued + queue.shutdown() - # Wait until the queue is fully processed. + # Create three worker tasks to process the queue concurrently. started_at = time.monotonic() - await queue.join() - total_slept_for = time.monotonic() - started_at + async with asyncio.TaskGroup() as tg: + for i in range(3): + tg.create_task(worker(f'worker-{i}', queue)) - # Cancel our worker tasks. - for task in tasks: - task.cancel() - # Wait until all worker tasks are cancelled. - await asyncio.gather(*tasks, return_exceptions=True) + total_slept_for = time.monotonic() - started_at print('====') print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds') diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index b357553735e8bb..73f08102e103ef 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -92,6 +92,12 @@ ast Added :func:`ast.compare` for comparing two ASTs. (Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.) +asyncio +------- + +Made :class:`asyncio.Queue` an :term:`asynchronous iterable`. +(Contributed by Wannes Boeykens in :gh:`119154`.) + os -- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 2f3865114a84f9..f799efa0534c40 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -30,6 +30,20 @@ class QueueShutDown(Exception): pass +class _AsyncQueueIterator: + def __init__(self, queue): + self._queue = queue + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self._queue.get() + except QueueShutDown: + raise StopAsyncIteration + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -76,6 +90,9 @@ def _wakeup_next(self, waiters): waiter.set_result(None) break + def __aiter__(self): + return _AsyncQueueIterator(self) + def __repr__(self): return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 5019e9a293525d..c5f8e4b36141d1 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -116,6 +116,28 @@ async def putter(): self.assertTrue(t.done()) self.assertTrue(t.result()) + async def test_aiter(self): + q = asyncio.Queue() + for i in range(100): + q.put_nowait(i) + + # All tasks have been queued + q.shutdown() + + accumulator = 0 + + async def worker(): + nonlocal accumulator + + async for item in q: + accumulator += item + + async with asyncio.TaskGroup() as tg: + tg.create_task(worker()) + tg.create_task(worker()) + + self.assertEqual(sum(range(100)), accumulator) + class QueueGetTests(unittest.IsolatedAsyncioTestCase): @@ -471,27 +493,22 @@ async def test_task_done(self): # Two workers get items from the queue and call task_done after each. # Join the queue and assert all items have been processed. - running = True async def worker(): nonlocal accumulator - while running: - item = await q.get() + async for item in q: accumulator += item q.task_done() async with asyncio.TaskGroup() as tg: - tasks = [tg.create_task(worker()) - for index in range(2)] - + tg.create_task(worker()) + tg.create_task(worker()) await q.join() self.assertEqual(sum(range(100)), accumulator) # close running generators - running = False - for i in range(len(tasks)): - q.put_nowait(0) + q.shutdown() async def test_join_empty_queue(self): q = self.q_class() diff --git a/Misc/NEWS.d/next/Library/2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst b/Misc/NEWS.d/next/Library/2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst new file mode 100644 index 00000000000000..9211ec561751ea --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst @@ -0,0 +1,2 @@ +Make :class:`asyncio.Queue` an :term:`asynchronous iterable`. +Contributed by Wannes Boeykens.