Skip to content

Commit

Permalink
Add infinite loop example in README (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
insomnes authored Mar 14, 2024
1 parent 4f53a15 commit e8d04db
Showing 1 changed file with 74 additions and 6 deletions.
80 changes: 74 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ to focus on the task logic rather than concurrency challenges.
- [How to use it?](#how-to-use-it)
- [Simple batch processing](#simple-batch-processing)
- [Result as async generator per completed](#result-as-async-generator-per-completed)
- [Infinite loop](#infinite-loop)
- [Rate limiting](#rate-limiting)
- [Manual task adding, context manager and error retry](#manual-task-adding-context-manager-and-error-retry)
- [Manual flow management and custom result queue](#manual-flow-management-and-custom-result-queue)
Expand Down Expand Up @@ -127,7 +128,7 @@ asyncio.run(main())
## Result as async generator per completed
```python
# Like previous but the result is async generator and the tasks are yielded
# in completition order
# in completion order
input_data = list(range(20))
aqute = Aqute(handle_coro=handler, workers_count=10)

Expand All @@ -143,6 +144,73 @@ asyncio.run(main())

```

## Infinite loop
You can run aqute on "infinite" amount of tasks if needed and control "end"
from outside. Outside of context or with `stop()` coro it will shutdown gracefully.

```python
import asyncio
import logging
from random import random

from aqute import Aqute


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')

logger = logging.getLogger("main")

async def handler(i: int) -> str:
"""
NOTE: This is a mock handler for demonstration purposes.
"""
await asyncio.sleep(0.01 + (0.009) * random())

return f"success {i}"

async def main():
"""
This example shows "infinite"workflow for aqute.
Here limited by 10_000 tasks, but it can be without any end.
"""

# Only to show ruling the done / not done status from outside
TASK_LIMIT = 10_000
done = asyncio.Event()

aqute = Aqute(handle_coro=handler, workers_count=20)

async def add_tasks():
# You can add tasks externally
for i in range(TASK_LIMIT):
await aqute.add_task(i)
# Sleep to simulate some delay between tasks adding (req, calc, etc.)
await asyncio.sleep(0.01 + (0.009) * random())

result = []

async def collect_results():
# You can collect and handle results externally too
while len(result) < TASK_LIMIT:
task = await aqute.get_task_result()
result.append(task)
done.set()

async with aqute:
asyncio.create_task(add_tasks())
asyncio.create_task(collect_results())
# Better to use await done.wait() instead of while loop,
# but it's just for demonstration purposes
while not done.is_set():
logger.info(f"Done tasks: {len(result):_}/{TASK_LIMIT:_}")
await asyncio.sleep(5)

logger.info(f"Done tasks: {len(result):_}/{TASK_LIMIT:_}")


asyncio.run(main())
```

## Rate limiting
You can also add RateLimiter instance to Aqute for rate limiting:
```python
Expand All @@ -160,17 +228,17 @@ You can also add RateLimiter instance to Aqute for rate limiting:
assert len(result) == len(input_data)
```

There are three avaliable `RateLimiter` implementations:
There are three available `RateLimiter` implementations:
- `TokenBucketRateLimiter`: steady rate by default, burstable with `allow_burst` option;
- `SlidingRateLimiter`: next call will be avaliable after enough time from the oldest one;
- `SlidingRateLimiter`: next call will be available after enough time from the oldest one;
- `PerWorkerRateLimiter`: enforces separate rate limits for each unique worker with separate `TokenBucketRateLimiter` instances;
- `RandomizedIntervalRateLimiter`: introduces more random intervals between each call,
but enforcing `max_rate` over `time_period` limit.

You can write your own `RateLimiter` implementation with specific algorithm if needed.

## Manual task adding, context manager and error retry
This can be most useful if not all of your tasks are avaliable at the start:
This can be most useful if not all of your tasks are available at the start:
```python
# You can add tasks manually and also start/stop aqute with context
# manager. And even add tasks on the fly.
Expand Down Expand Up @@ -232,7 +300,7 @@ This can be most useful if not all of your tasks are avaliable at the start:
for _ in range(5):
await result_q.get()

# Now wait till all finished via speicific method, this also notifies
# Now wait till all finished via specific method, this also notifies
# aqute that we have added all tasks
await aqute.wait_till_end()
assert result_q.qsize() == 5
Expand All @@ -259,7 +327,7 @@ This can be most useful if not all of your tasks are avaliable at the start:
```

## Use priroty queue
You can prioretize tasks by setting `use_priority_queue` flag:
You can prioritize tasks by setting `use_priority_queue` flag:

```python
async def handler(i: int) -> int:
Expand Down

0 comments on commit e8d04db

Please sign in to comment.