Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e9c1a3e
Add `asyncio.Queue.__aiter__()`
nineteendo Jun 14, 2024
3cd59ee
Reduce diff
nineteendo Jun 14, 2024
16fc429
Add test
nineteendo Jun 14, 2024
f13ddb3
Simplify code
nineteendo Jun 14, 2024
afd7ad4
Add to `__all__`
nineteendo Jun 14, 2024
214548c
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 14, 2024
9824b63
Simplify example
nineteendo Jun 14, 2024
6358b8e
Fix indentation
nineteendo Jun 14, 2024
316b800
Improve order
nineteendo Jun 14, 2024
92f3295
Test `StopAsyncIteration`
nineteendo Jun 14, 2024
a18767d
Document `AsyncQueueIterator`
nineteendo Jun 14, 2024
320691c
Apply suggestions from code review
nineteendo Jun 15, 2024
f43bdb1
Unpack for loop
nineteendo Jun 15, 2024
4ca4a93
Insert newline
nineteendo Jun 15, 2024
fffacb2
Simplify example
nineteendo Jun 15, 2024
92d7b16
Indent correctly
nineteendo Jun 15, 2024
ce23fce
Document `__aiter__()`
nineteendo Jun 15, 2024
640d2a8
Fix indent attempt 1
nineteendo Jun 15, 2024
6c9aa6d
Make `AsyncQueueIterator` private
nineteendo Jun 18, 2024
e6a5849
Use term
nineteendo Jun 18, 2024
583f642
Re-order
nineteendo Jun 18, 2024
1b6e781
Remove comment
nineteendo Jun 18, 2024
2d5998a
Update whatsnew
nineteendo Jun 19, 2024
841d50c
Update blurb
nineteendo Jun 19, 2024
d624bea
Apply suggestions from code review
nineteendo Jun 19, 2024
0a8a72b
Update Doc/library/asyncio-queue.rst
nineteendo Jun 19, 2024
b3199df
Rename 2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst to 2024-06-14-1…
nineteendo Jun 19, 2024
d71eb91
Make queue a private attribute
nineteendo Jun 19, 2024
23e37f4
Clarify behaviour
nineteendo Jun 20, 2024
3bb4e65
Fix grammar
nineteendo Jun 20, 2024
ba8b134
Explicit method
nineteendo Jun 23, 2024
5d6681a
Fix lint
nineteendo Jun 23, 2024
823389f
Don't use multiple threads
nineteendo Jun 23, 2024
104423a
Add docstrings
nineteendo Jun 23, 2024
d3e06d4
Revert newline
nineteendo Jun 23, 2024
f5aeeac
Delete Misc/NEWS.d/next/Library/2024-06-14-10-30-43.gh-issue-119154.1…
nineteendo Jun 23, 2024
fb98fa6
Update whatsnew
nineteendo Jun 23, 2024
a56505d
Fix typo
nineteendo Jun 23, 2024
6f146f5
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 23, 2024
0bbcfdd
Update whatsnew
nineteendo Jun 23, 2024
81da0e6
Update 3.14.rst
nineteendo Jun 24, 2024
461d1e1
Delete Misc/NEWS.d/next/Library/2024-06-23-20-31-59.gh-issue-120924.2…
nineteendo Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions Doc/library/asyncio-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ Queue
Return an item if one is immediately available, else raise
:exc:`QueueEmpty`.

.. method:: iter()

Return an :term:`asynchronous iterator` which iterates over the queue
of items. If queue is empty, wait until the next item is available.

Stops iteration if the queue has been shut down and is empty, or if
the queue has been shut down immediately.

.. versionadded:: 3.14

.. method:: iter_nowait()

Return an :term:`iterator` which iterates over the queue of items
without blocking.

Only iterate over the items which are immediately available, but raise
the :exc:`QueueEmpty` exception if none are.

.. versionadded:: 3.14

.. coroutinemethod:: join()

Block until all items in the queue have been received and processed.
Expand Down Expand Up @@ -191,16 +211,11 @@ concurrent tasks::


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Get a "work item" out of the queue.
async for sleep_for in queue.iter():
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


Expand All @@ -215,22 +230,16 @@ concurrent tasks::
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# All tasks have been queued
queue.shutdown()

# Wait until the queue is fully processed.
# Create three worker tasks to process the queue concurrently.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(worker(f'worker-{i}', queue))

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
total_slept_for = time.monotonic() - started_at

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ ast
Added :func:`ast.compare` for comparing two ASTs.
(Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.)

asyncio
-------

Add :meth:`asyncio.Queue.iter` and :meth:`asyncio.Queue.iter_nowait`.
(Contributed by Wannes Boeykens in :gh:`120925`.)

os
--

Expand Down
41 changes: 41 additions & 0 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class QueueShutDown(Exception):
pass


class _AsyncQueueIterator:
def __init__(self, queue):
self._queue = queue

def __aiter__(self):
return self

async def __anext__(self):
try:
return await self._queue.get()
except QueueShutDown:
raise StopAsyncIteration


class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.

Expand Down Expand Up @@ -216,6 +230,33 @@ def get_nowait(self):
self._wakeup_next(self._putters)
return item

def iter(self):
"""Return an asynchronous iterator which iterates over the queue of
items. If queue is empty, wait until the next item is available.

Stops iteration if the queue has been shut down and is empty, or if the
queue has been shut down immediately.
"""
return _AsyncQueueIterator(self)

def iter_nowait(self):
"""Return an iterator which iterates over the queue of items without
blocking.

Only iterate over the items which are immediately available, but raise
the QueueEmpty exception if none are.
"""
try:
yield self.get_nowait()
except QueueShutDown:
return

try:
while True:
yield self.get_nowait()
except (QueueEmpty, QueueShutDown):
return

def task_done(self):
"""Indicate that a formerly enqueued task is complete.

Expand Down
49 changes: 40 additions & 9 deletions Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,37 @@ async def putter():
self.assertTrue(t.done())
self.assertTrue(t.result())

async def test_iter(self):
q = asyncio.Queue()
accumulator = 0

async def worker():
nonlocal accumulator

async for item in q.iter():
accumulator += item

async with asyncio.TaskGroup() as tg:
tg.create_task(worker())
tg.create_task(worker())
for i in range(100):
q.put_nowait(i)

q.shutdown()

self.assertEqual(sum(range(100)), accumulator)

async def test_iter_nowait(self):
q = asyncio.Queue()
accumulator = 0
for i in range(100):
q.put_nowait(i)

for item in q.iter_nowait():
accumulator += item

self.assertEqual(sum(range(100)), accumulator)


class QueueGetTests(unittest.IsolatedAsyncioTestCase):

Expand Down Expand Up @@ -167,6 +198,8 @@ def test_nonblocking_get(self):
def test_nonblocking_get_exception(self):
q = asyncio.Queue()
self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
with self.assertRaises(asyncio.QueueEmpty):
list(q.iter_nowait())

async def test_get_cancelled_race(self):
q = asyncio.Queue()
Expand Down Expand Up @@ -471,27 +504,22 @@ async def test_task_done(self):

# Two workers get items from the queue and call task_done after each.
# Join the queue and assert all items have been processed.
running = True

async def worker():
nonlocal accumulator

while running:
item = await q.get()
async for item in q.iter():
accumulator += item
q.task_done()

async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker())
for index in range(2)]

tg.create_task(worker())
tg.create_task(worker())
await q.join()
self.assertEqual(sum(range(100)), accumulator)

# close running generators
running = False
for i in range(len(tasks)):
q.put_nowait(0)
q.shutdown()

async def test_join_empty_queue(self):
q = self.q_class()
Expand Down Expand Up @@ -566,6 +594,7 @@ async def test_shutdown_empty(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

async def test_shutdown_nonempty(self):
# Test shutting down a non-empty queue
Expand Down Expand Up @@ -610,6 +639,7 @@ async def test_shutdown_nonempty(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

# Ensure there is 1 unfinished task, and join() task succeeds
q.task_done()
Expand Down Expand Up @@ -652,6 +682,7 @@ async def test_shutdown_immediate(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

# Ensure there are no unfinished tasks
with self.assertRaises(
Expand Down