Skip to content

Commit

Permalink
Add total_failed_tasks_limit option to engine (#29)
Browse files Browse the repository at this point in the history
* Add total_failed_tasks_limit option to engine

* Update readme
  • Loading branch information
insomnes authored Aug 7, 2024
1 parent 170cf4a commit 8cb8735
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 4 deletions.
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ to focus on the task logic rather than concurrency challenges.
- [Even more manual management and internal worker queue size](#even-more-manual-management-and-internal-worker-queue-size)
- [Use priroty queue](#use-priroty-queue)
- [Task timeout setting](#task-timeout-setting)
- [Early stopping on too many failed tasks](#early-stopping-on-too-many-failed-tasks)
- [Barebone queue via Foreman](#barebone-queue-via-foreman)
- [Some caveats](#some-caveats)
- [Start load timeout](#start-load-timeout)
Expand Down Expand Up @@ -349,7 +350,7 @@ You can prioritize tasks by setting `use_priority_queue` flag:
await aqute.wait_till_end()

results = aqute.extract_all_results()
assert [t.data for t in results] == [1, 5, 10, 10, 1_000_000]```
assert [t.data for t in results] == [1, 5, 10, 10, 1_000_000]
```

## Task timeout setting
Expand Down Expand Up @@ -382,6 +383,30 @@ aqute = Aqute(
)
```

## Early stopping on too many failed tasks
If you want to stop the processing when too many tasks have failed, you can use the
`total_failed_tasks_limit` option. This will raise `AquteTooManyTasksFailedError` if
the limit is reached before all tasks are processed:
```python
async def failing_handler(task: int) -> int:
await asyncio.sleep(0.01)
if task % 2 == 0:
raise ValueError("Even task number")
return task

aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
total_failed_tasks_limit=5,
)
for i in range(10):
await aqute.add_task(i)

# This will raise AquteTooManyTasksFailedError cause we have enough failed tasks
# before all tasks are processed
async with aqute:
await aqute.wait_till_end()
```

## Barebone queue via Foreman
If you don't need retry flow and high-level helpers you can use `Foreman` for bare flow,
Expand Down
3 changes: 2 additions & 1 deletion aqute/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from aqute.engine import Aqute
from aqute.errors import AquteError, AquteTaskTimeoutError
from aqute.errors import AquteError, AquteTaskTimeoutError, AquteTooManyTasksFailedError
from aqute.task import AquteTask

__all__ = [
"Aqute",
"AquteError",
"AquteTaskTimeoutError",
"AquteTooManyTasksFailedError",
"AquteTask",
]
24 changes: 23 additions & 1 deletion aqute/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from typing_extensions import Self

from aqute.errors import AquteError
from aqute.errors import AquteError, AquteTooManyTasksFailedError
from aqute.ratelimiter import RateLimiter
from aqute.task import AquteTask, AquteTaskQueueType, TData, TResult
from aqute.worker import Foreman
Expand All @@ -39,6 +39,7 @@ def __init__(
input_task_queue_size: int = 0,
use_priority_queue: bool = False,
task_timeout_seconds: Optional[Union[int, float]] = None,
total_failed_tasks_limit: Optional[int] = None,
):
"""
Engine for reliable running asynchronous tasks via queue with simple retry and
Expand Down Expand Up @@ -68,6 +69,8 @@ def __init__(
Defaults to None. AquteTaskTimeoutError will be raised
if task processing takes longer than this value. To not retry task
on timeout, add this exception to `errors_to_not_retry`.
total_failed_tasks_limit (optional): Maximum failed tasks count before
stopping processing. Defaults to None.
"""
self.result_queue: AquteTaskQueueType[TData, TResult] = (
result_queue or asyncio.Queue()
Expand All @@ -83,6 +86,9 @@ def __init__(
self._use_priority_queue = use_priority_queue
self._task_timeout_seconds = task_timeout_seconds

self._failed_tasks = 0
self._total_failed_limit = total_failed_tasks_limit

self._foreman = Foreman(
handle_coro=self._handle_coro,
workers_count=self._workers_count,
Expand Down Expand Up @@ -131,6 +137,8 @@ async def wait_till_end(self) -> None:
Raises:
AquteError: If the task hasn't been initiated.
AquteTooManyTasksFailedError: If there was limit on failed tasks
and it was reached.
Side Effects:
Marks all tasks as added.
Expand Down Expand Up @@ -374,6 +382,7 @@ async def _process_error_task(self, task: AquteTask[TData, TResult]) -> None:
f"Task {task_id} is not retriable, finishing task "
f"{self._added_tasks_count, self._finished_tasks_count}"
)
self._check_failed_tasks_limit()
return

task.error = None
Expand All @@ -382,6 +391,19 @@ async def _process_error_task(self, task: AquteTask[TData, TResult]) -> None:
)
await self._foreman.add_task(task)

def _check_failed_tasks_limit(self) -> None:
if self._total_failed_limit is None:
return

self._failed_tasks += 1
if self._failed_tasks < self._total_failed_limit:
return

logger.debug(f"Total failed tasks limit reached: {self._total_failed_limit}")
raise AquteTooManyTasksFailedError(
f"Total failed tasks limit reached: {self._total_failed_limit}"
)

def _should_retry_task(self, task: AquteTask[TData, TResult]) -> bool:
if self._errors_to_not_retry and isinstance(
task.error, self._errors_to_not_retry
Expand Down
4 changes: 4 additions & 0 deletions aqute/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,7 @@ class AquteError(Exception):

class AquteTaskTimeoutError(AquteError):
"""Raised when Aqute task coroutine times out if timeout is set"""


class AquteTooManyTasksFailedError(AquteError):
"""Raised when too many tasks failed"""
43 changes: 42 additions & 1 deletion tests/aqute/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest

from aqute import Aqute, AquteError
from aqute import Aqute, AquteError, AquteTooManyTasksFailedError


async def non_failing_handler(task: Any) -> Any:
Expand Down Expand Up @@ -53,3 +53,44 @@ async def test_not_raising_on_none_timeout():
async def test_raising_with_timeout():
with pytest.raises(AquteError):
await asyncio.wait_for(aq_wait_coro(0.2), timeout=0.5)


async def failing_handler(task: int) -> int:
await asyncio.sleep(0.01)
if task % 2 == 0:
raise ValueError("Even task number")
return task


@pytest.mark.asyncio
@pytest.mark.parametrize("retry_count", [0, 1], ids=["no_retry", "one_retry"])
async def test_too_many_failed_tasks_error(retry_count: int):
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
retry_count=retry_count,
total_failed_tasks_limit=2,
)
for i in range(1, 6):
await aqute.add_task(i)

with pytest.raises(AquteTooManyTasksFailedError) as exc:
async with aqute:
await aqute.wait_till_end()
assert "limit reached: 2" in str(exc.value)


@pytest.mark.asyncio
@pytest.mark.parametrize("retry_count", [0, 1], ids=["no_retry", "one_retry"])
async def test_not_enough_failed_tasks_for_error(retry_count: int):
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
retry_count=retry_count,
total_failed_tasks_limit=3,
)
for i in range(1, 6):
await aqute.add_task(i)

async with aqute:
await aqute.wait_till_end()
14 changes: 14 additions & 0 deletions tests/aqute/test_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,17 @@ async def test_failed_tasks():
await aqute.wait_till_end()

check_susccess_and_fails(aqute, 9, 1)


@pytest.mark.asyncio
async def test_total_failed_tasks_limit_do_not_intervene():
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
total_failed_tasks_limit=2,
)
async with aqute:
await add_tasks(aqute, 10)
await aqute.wait_till_end()

check_susccess_and_fails(aqute, 9, 1)

0 comments on commit 8cb8735

Please sign in to comment.