Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
e9c1a3e
Add `asyncio.Queue.__aiter__()`
nineteendo Jun 14, 2024
3cd59ee
Reduce diff
nineteendo Jun 14, 2024
16fc429
Add test
nineteendo Jun 14, 2024
f13ddb3
Simplify code
nineteendo Jun 14, 2024
afd7ad4
Add to `__all__`
nineteendo Jun 14, 2024
214548c
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 14, 2024
9824b63
Simplify example
nineteendo Jun 14, 2024
6358b8e
Fix indentation
nineteendo Jun 14, 2024
316b800
Improve order
nineteendo Jun 14, 2024
92f3295
Test `StopAsyncIteration`
nineteendo Jun 14, 2024
a18767d
Document `AsyncQueueIterator`
nineteendo Jun 14, 2024
e826a01
Add `queue.Queue.__iter__()`
nineteendo Jun 14, 2024
c8cbe4d
Add `asyncio.Queue.__iter__()`
nineteendo Jun 14, 2024
940e3c0
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 14, 2024
8fca288
Fix lint
nineteendo Jun 14, 2024
499457c
Wait
nineteendo Jun 14, 2024
7f206d9
Update 2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst
nineteendo Jun 14, 2024
320691c
Apply suggestions from code review
nineteendo Jun 15, 2024
f43bdb1
Unpack for loop
nineteendo Jun 15, 2024
4ca4a93
Insert newline
nineteendo Jun 15, 2024
fffacb2
Simplify example
nineteendo Jun 15, 2024
92d7b16
Indent correctly
nineteendo Jun 15, 2024
ce23fce
Document `__aiter__()`
nineteendo Jun 15, 2024
640d2a8
Fix indent attempt 1
nineteendo Jun 15, 2024
3cad181
Move while loop inside try/except block
nineteendo Jun 18, 2024
cc9f7ef
test iteration
nineteendo Jun 18, 2024
92d7dc0
Reduce diff
nineteendo Jun 18, 2024
a92e031
Run test
nineteendo Jun 18, 2024
3621cc1
Reduce diff
nineteendo Jun 18, 2024
98252c6
Adapt example from asyncio
nineteendo Jun 18, 2024
4b1ff4b
Simpler example
nineteendo Jun 18, 2024
5e9f48b
Put definition first
nineteendo Jun 18, 2024
4b382ec
Print worker name
nineteendo Jun 18, 2024
076a7a6
Restore example
nineteendo Jun 18, 2024
6132f5a
Indent example
nineteendo Jun 18, 2024
6c9aa6d
Make `AsyncQueueIterator` private
nineteendo Jun 18, 2024
9a28bc5
Cleanup tests
nineteendo Jun 18, 2024
0a69fd9
Instantiate
nineteendo Jun 18, 2024
e6a5849
Use term
nineteendo Jun 18, 2024
4407eed
Move inside with block
nineteendo Jun 18, 2024
1e500ab
Add comment
nineteendo Jun 18, 2024
583f642
Re-order
nineteendo Jun 18, 2024
1b6e781
Remove comment
nineteendo Jun 18, 2024
101409f
Separate function definition
nineteendo Jun 18, 2024
2d5998a
Update whatsnew
nineteendo Jun 19, 2024
841d50c
Update blurb
nineteendo Jun 19, 2024
bcdb21f
Update whatsnew
nineteendo Jun 19, 2024
0743257
Update blurb
nineteendo Jun 19, 2024
d624bea
Apply suggestions from code review
nineteendo Jun 19, 2024
0a8a72b
Update Doc/library/asyncio-queue.rst
nineteendo Jun 19, 2024
b3199df
Rename 2024-06-14-10-30-43.gh-issue-119154.1MHWA-.rst to 2024-06-14-1…
nineteendo Jun 19, 2024
5e37cd5
Rename 2024-06-14-13-37-45.gh-issue-120499.JK7Mv8.rst to 2024-06-14-1…
nineteendo Jun 19, 2024
d71eb91
Make queue a private attribute
nineteendo Jun 19, 2024
1f05869
Merge branch 'main' into add-Queue.__iter__
nineteendo Jun 19, 2024
23e37f4
Clarify behaviour
nineteendo Jun 20, 2024
3bb4e65
Fix grammar
nineteendo Jun 20, 2024
b23a1b3
Clarify documentation
nineteendo Jun 21, 2024
b4e5c60
Delete Misc/NEWS.d/next/Library/2024-06-14-13-37-45.gh-issue-120499.J…
nineteendo Jun 23, 2024
f6ba54e
Update 3.14.rst
nineteendo Jun 23, 2024
a50f12f
Explicit method
nineteendo Jun 23, 2024
33af2a6
Re-order
nineteendo Jun 23, 2024
99db150
Revert calling `task_done()`
nineteendo Jun 23, 2024
fd96d7d
Add more tests
nineteendo Jun 23, 2024
3fb6f88
Call `iter()`
nineteendo Jun 23, 2024
d2bf0a1
Fix tests
nineteendo Jun 23, 2024
e2f6ad8
Add paragraph about `shutdown()`
nineteendo Jun 23, 2024
10188a3
Test blocking
nineteendo Jun 23, 2024
ef06383
Ensure queue is defined
nineteendo Jun 23, 2024
ae06d69
Update whatsnew
nineteendo Jun 23, 2024
8613f61
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 23, 2024
ba8b134
Explicit method
nineteendo Jun 23, 2024
5d6681a
Fix lint
nineteendo Jun 23, 2024
823389f
Don't use multiple threads
nineteendo Jun 23, 2024
a1c8ebb
Don't use multiple threads
nineteendo Jun 23, 2024
104423a
Add docstrings
nineteendo Jun 23, 2024
d3e06d4
Revert newline
nineteendo Jun 23, 2024
f5aeeac
Delete Misc/NEWS.d/next/Library/2024-06-14-10-30-43.gh-issue-119154.1…
nineteendo Jun 23, 2024
efc7d8e
Remove redundant parentheses
nineteendo Jun 23, 2024
fb98fa6
Update whatsnew
nineteendo Jun 23, 2024
a56505d
Fix typo
nineteendo Jun 23, 2024
6f146f5
📜🤖 Added by blurb_it.
blurb-it[bot] Jun 23, 2024
0bbcfdd
Update whatsnew
nineteendo Jun 23, 2024
81da0e6
Update 3.14.rst
nineteendo Jun 24, 2024
461d1e1
Delete Misc/NEWS.d/next/Library/2024-06-23-20-31-59.gh-issue-120924.2…
nineteendo Jun 24, 2024
48783bb
Merge pull request #16 from nineteendo/add-asyncio.Queue.iter
nineteendo Jun 24, 2024
4d7e55b
Update 2024-06-23-19-35-14.gh-issue-120924.7CXFaX.rst
nineteendo Jun 24, 2024
888ae9e
Remove parentheses
nineteendo Jun 24, 2024
a413d13
Link to github issue
nineteendo Jun 24, 2024
dcdd17e
Rename 2024-06-23-19-35-14.gh-issue-120924.7CXFaX.rst to 2024-06-23-1…
nineteendo Jun 25, 2024
525effe
Update issue number
nineteendo Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions Doc/library/asyncio-queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ Queue
Return an item if one is immediately available, else raise
:exc:`QueueEmpty`.

.. method:: iter()

Return an :term:`asynchronous iterator` which iterates over the queue
of items. If queue is empty, wait until the next item is available.

Stops iteration if the queue has been shut down and is empty, or if
the queue has been shut down immediately.

.. versionadded:: 3.14

.. method:: iter_nowait()

Return an :term:`iterator` which iterates over the queue of items
without blocking.

Only iterate over the items which are immediately available, but raise
the :exc:`QueueEmpty` exception if none are.

.. versionadded:: 3.14

.. coroutinemethod:: join()

Block until all items in the queue have been received and processed.
Expand Down Expand Up @@ -191,16 +211,11 @@ concurrent tasks::


async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()

# Get a "work item" out of the queue.
async for sleep_for in queue.iter():
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)

# Notify the queue that the "work item" has been processed.
queue.task_done()

print(f'{name} has slept for {sleep_for:.2f} seconds')


Expand All @@ -215,22 +230,16 @@ concurrent tasks::
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)

# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# All tasks have been queued
queue.shutdown()

# Wait until the queue is fully processed.
# Create three worker tasks to process the queue concurrently.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
async with asyncio.TaskGroup() as tg:
for i in range(3):
tg.create_task(worker(f'worker-{i}', queue))

# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
total_slept_for = time.monotonic() - started_at

print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
Expand Down
46 changes: 44 additions & 2 deletions Doc/library/queue.rst
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,49 @@ provide the public methods described below.
will not block. Similarly, if full() returns ``False`` it doesn't
guarantee that a subsequent call to put() will not block.

.. method:: Queue.iter(block=True, timeout=None)

Return an :term:`iterator` which iterates over the queue of items. If optional
args *block* is true and *timeout* is ``None`` (the default), block if necessary
until the next item is available. If *timeout* is a positive number, it blocks
at most *timeout* seconds and stops iteration if no item was available within that
time, but if the first item is not available the :exc:`Empty` exception is raised.
Otherwise (*block* is false), iterate over all item which are immediately
available, but raise the :exc:`Empty` exception if none are (*timeout* is ignored
in that case).

Stops iteration if the queue has been shut down and is empty, or if the queue has
been shut down immediately.

Example::

import concurrent.futures
import queue
import time

def worker(name, q):
for item in q.iter():
time.sleep(.01)
print(f'{name} finished {item}')

q = queue.Queue()
for item in range(30):
q.put(item)

q.shutdown()
with concurrent.futures.ThreadPoolExecutor() as tp:
for i in range(3):
tp.submit(worker, f'worker-{i}', q)

print('All work completed')

.. versionadded:: 3.14

.. method:: Queue.iter_nowait()

Equivalent to ``iter(False)``.

.. versionadded:: 3.14

.. method:: Queue.put(item, block=True, timeout=None)

Expand Down Expand Up @@ -212,8 +255,7 @@ Example of how to wait for enqueued tasks to be completed::
q = queue.Queue()

def worker():
while True:
item = q.get()
for item in q:
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
Expand Down
12 changes: 12 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ ast
Added :func:`ast.compare` for comparing two ASTs.
(Contributed by Batuhan Taskaya and Jeremy Hylton in :issue:`15987`.)

asyncio
-------

Add :meth:`asyncio.Queue.iter` and :meth:`asyncio.Queue.iter_nowait`.
(Contributed by Wannes Boeykens in :gh:`119154`.)

os
--

Expand All @@ -107,6 +113,12 @@ pathlib
another, like :func:`shutil.copyfile`.
(Contributed by Barney Gale in :gh:`73991`.)

queue
-----

Add :meth:`queue.Queue.iter` and :meth:`queue.Queue.iter_nowait`.
(Contributed by Wannes Boeykens in :gh:`119154`.)

symtable
--------

Expand Down
41 changes: 41 additions & 0 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ class QueueShutDown(Exception):
pass


class _AsyncQueueIterator:
def __init__(self, queue):
self._queue = queue

def __aiter__(self):
return self

async def __anext__(self):
try:
return await self._queue.get()
except QueueShutDown:
raise StopAsyncIteration


class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.

Expand Down Expand Up @@ -216,6 +230,33 @@ def get_nowait(self):
self._wakeup_next(self._putters)
return item

def iter(self):
"""Return an asynchronous iterator which iterates over the queue of
items. If queue is empty, wait until the next item is available.

Stops iteration if the queue has been shut down and is empty, or if the
queue has been shut down immediately.
"""
return _AsyncQueueIterator(self)

def iter_nowait(self):
"""Return an iterator which iterates over the queue of items without
blocking.

Only iterate over the items which are immediately available, but raise
the QueueEmpty exception if none are.
"""
try:
yield self.get_nowait()
except QueueShutDown:
return

try:
while True:
yield self.get_nowait()
except (QueueEmpty, QueueShutDown):
return

def task_done(self):
"""Indicate that a formerly enqueued task is complete.

Expand Down
35 changes: 35 additions & 0 deletions Lib/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,41 @@ def _put(self, item):
def _get(self):
return self.queue.popleft()

def iter(self, block=True, timeout=None):
'''Return an iterator which iterates over the queue of items.

If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until the next item is available. If 'timeout' is a
positive number, it blocks at most 'timeout' seconds and stops
iteration if no item was available within that time, but if the first
item is not available the Empty exception is raised. Otherwise ('block'
is false), iterate over all item which are immediately available, but
raise the Empty exception if none are ('timeout' is ignored in that
case).

Stops iteration if the queue has been shut down and is empty, or if the
queue has been shut down immediately.
'''
try:
yield self.get(block=block, timeout=timeout)
except ShutDown:
return

try:
while True:
yield self.get(block=block, timeout=timeout)
except (Empty, ShutDown):
return

def iter_nowait(self):
'''Return an iterator which iterates over the queue of items without
blocking.

Only iterate over the items which are immediately available, but raise
the Empty exception if none are.
'''
return self.iter(block=False)

__class_getitem__ = classmethod(types.GenericAlias)


Expand Down
49 changes: 40 additions & 9 deletions Lib/test/test_asyncio/test_queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,37 @@ async def putter():
self.assertTrue(t.done())
self.assertTrue(t.result())

async def test_iter(self):
q = asyncio.Queue()
accumulator = 0

async def worker():
nonlocal accumulator

async for item in q.iter():
accumulator += item

async with asyncio.TaskGroup() as tg:
tg.create_task(worker())
tg.create_task(worker())
for i in range(100):
q.put_nowait(i)

q.shutdown()

self.assertEqual(sum(range(100)), accumulator)

async def test_iter_nowait(self):
q = asyncio.Queue()
accumulator = 0
for i in range(100):
q.put_nowait(i)

for item in q.iter_nowait():
accumulator += item

self.assertEqual(sum(range(100)), accumulator)


class QueueGetTests(unittest.IsolatedAsyncioTestCase):

Expand Down Expand Up @@ -167,6 +198,8 @@ def test_nonblocking_get(self):
def test_nonblocking_get_exception(self):
q = asyncio.Queue()
self.assertRaises(asyncio.QueueEmpty, q.get_nowait)
with self.assertRaises(asyncio.QueueEmpty):
list(q.iter_nowait())

async def test_get_cancelled_race(self):
q = asyncio.Queue()
Expand Down Expand Up @@ -471,27 +504,22 @@ async def test_task_done(self):

# Two workers get items from the queue and call task_done after each.
# Join the queue and assert all items have been processed.
running = True

async def worker():
nonlocal accumulator

while running:
item = await q.get()
async for item in q.iter():
accumulator += item
q.task_done()

async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(worker())
for index in range(2)]

tg.create_task(worker())
tg.create_task(worker())
await q.join()
self.assertEqual(sum(range(100)), accumulator)

# close running generators
running = False
for i in range(len(tasks)):
q.put_nowait(0)
q.shutdown()

async def test_join_empty_queue(self):
q = self.q_class()
Expand Down Expand Up @@ -566,6 +594,7 @@ async def test_shutdown_empty(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

async def test_shutdown_nonempty(self):
# Test shutting down a non-empty queue
Expand Down Expand Up @@ -610,6 +639,7 @@ async def test_shutdown_nonempty(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

# Ensure there is 1 unfinished task, and join() task succeeds
q.task_done()
Expand Down Expand Up @@ -652,6 +682,7 @@ async def test_shutdown_immediate(self):
await q.get()
with self.assertRaisesShutdown():
q.get_nowait()
self.assertEqual(list(q.iter_nowait()), [])

# Ensure there are no unfinished tasks
with self.assertRaises(
Expand Down
Loading