Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e9c1a3e
Add `asyncio.Queue.__aiter__()`
nineteendo Jun 14, 2024
3cd59ee
Reduce diff
nineteendo Jun 14, 2024
16fc429
Add test
nineteendo Jun 14, 2024
f13ddb3
Simplify code
nineteendo Jun 14, 2024
afd7ad4
Add to `__all__`
nineteendo Jun 14, 2024
214548c
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 14, 2024
9824b63
Simplify example
nineteendo Jun 14, 2024
6358b8e
Fix indentation
nineteendo Jun 14, 2024
316b800
Improve order
nineteendo Jun 14, 2024
92f3295
Test `StopAsyncIteration`
nineteendo Jun 14, 2024
a18767d
Document `AsyncQueueIterator`
nineteendo Jun 14, 2024
320691c
Apply suggestions from code review
nineteendo Jun 15, 2024
f43bdb1
Unpack for loop
nineteendo Jun 15, 2024
4ca4a93
Insert newline
nineteendo Jun 15, 2024
fffacb2
Simplify example
nineteendo Jun 15, 2024
92d7b16
Indent correctly
nineteendo Jun 15, 2024
ce23fce
Document `__aiter__()`
nineteendo Jun 15, 2024
640d2a8
Fix indent attempt 1
nineteendo Jun 15, 2024
6c9aa6d
Make `AsyncQueueIterator` private
nineteendo Jun 18, 2024
e6a5849
Use term
nineteendo Jun 18, 2024
583f642
Re-order
nineteendo Jun 18, 2024
1b6e781
Remove comment
nineteendo Jun 18, 2024
2d5998a
Update whatsnew
nineteendo Jun 19, 2024
841d50c
Update blurb
nineteendo Jun 19, 2024
d624bea
Apply suggestions from code review
nineteendo Jun 19, 2024
0a8a72b
Update Doc/library/asyncio-queue.rst
nineteendo Jun 19, 2024
b3199df
Rename 2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst to 2024-06-14-1…
nineteendo Jun 19, 2024
d71eb91
Make queue a private attribute
nineteendo Jun 19, 2024
23e37f4
Clarify behaviour
nineteendo Jun 20, 2024
3bb4e65
Fix grammar
nineteendo Jun 20, 2024
9eff835
Update 2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst
nineteendo Jun 24, 2024
8580e98
Remove parentheses
nineteendo Jun 24, 2024
670611f
Update Doc/whatsnew/3.14.rst
nineteendo Jun 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions Doc/library/asyncio-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ Queue

Number of items allowed in the queue.

.. method:: __aiter__()

Return an :term:`asynchronous iterator` which iterates over
the queue of items until :meth:`shutdown` is called and
continues iteration until the queue is empty.

``shutdown(immediate=True)`` stops iteration immediately.

.. versionadded:: 3.14

.. method:: empty()

Return ``True`` if the queue is empty, ``False`` otherwise.
Expand Down Expand Up @@ -191,16 +201,11 @@ concurrent tasks::


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Get a "work item" out of the queue.
async for sleep_for in queue:
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


Expand All @@ -215,22 +220,16 @@ concurrent tasks::
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# All tasks have been queued
queue.shutdown()

# Wait until the queue is fully processed.
# Create three worker tasks to process the queue concurrently.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(worker(f'worker-{i}', queue))

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
total_slept_for = time.monotonic() - started_at

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
Expand Down
6 changes: 6 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ ast
Added :func:`ast.compare` for comparing two ASTs.
(Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.)

asyncio
-------

Made :class:`asyncio.Queue` an :term:`asynchronous iterable`.
(Contributed by Wannes Boeykens in :gh:`119154`.)

os
--

Expand Down
17 changes: 17 additions & 0 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class QueueShutDown(Exception):
pass


class _AsyncQueueIterator:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why you created a new class here as iterator instead of adding __anext__ to queue itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From #119154 (comment):

it feels more reasonable to describe queues as aiterables than aiterators.

def __init__(self, queue):
self._queue = queue

def __aiter__(self):
return self

async def __anext__(self):
try:
return await self._queue.get()
except QueueShutDown:
raise StopAsyncIteration


class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.

Expand Down Expand Up @@ -76,6 +90,9 @@ def _wakeup_next(self, waiters):
waiter.set_result(None)
break

def __aiter__(self):
return _AsyncQueueIterator(self)

def __repr__(self):
return f'<{type(self).__name__} at {id(self):#x} {self._format()}>'

Expand Down
35 changes: 26 additions & 9 deletions Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ async def putter():
self.assertTrue(t.done())
self.assertTrue(t.result())

async def test_aiter(self):
q = asyncio.Queue()
for i in range(100):
q.put_nowait(i)

# All tasks have been queued
q.shutdown()

accumulator = 0

async def worker():
nonlocal accumulator

async for item in q:
accumulator += item

async with asyncio.TaskGroup() as tg:
tg.create_task(worker())
tg.create_task(worker())

self.assertEqual(sum(range(100)), accumulator)


class QueueGetTests(unittest.IsolatedAsyncioTestCase):

Expand Down Expand Up @@ -471,27 +493,22 @@ async def test_task_done(self):

# Two workers get items from the queue and call task_done after each.
# Join the queue and assert all items have been processed.
running = True

async def worker():
nonlocal accumulator

while running:
item = await q.get()
async for item in q:
accumulator += item
q.task_done()

async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker())
for index in range(2)]

tg.create_task(worker())
tg.create_task(worker())
await q.join()
self.assertEqual(sum(range(100)), accumulator)

# close running generators
running = False
for i in range(len(tasks)):
q.put_nowait(0)
q.shutdown()

async def test_join_empty_queue(self):
q = self.q_class()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Make :class:`asyncio.Queue` an :term:`asynchronous iterable`.
Contributed by Wannes Boeykens.