Skip to content

Commit

Permalink
Fix early stopping with helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
insomnes committed Oct 8, 2024
1 parent 18da71d commit 5f7e0ce
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
12 changes: 11 additions & 1 deletion aqute/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,18 @@ async def get_task_result(self) -> AquteTask[TData, TResult]:
Returns:
AquteTask with result or error set
Raises:
AquteTooManyTasksFailedError: If there was limit on failed tasks
and it was reached.
"""
return await self.result_queue.get()
while True:
with contextlib.suppress(asyncio.TimeoutError):
return await asyncio.wait_for(self.result_queue.get(), timeout=0.1)
if self.aiotask_of_run_load is None:
raise AquteError("Cannot get task result without started load")
if self.aiotask_of_run_load.done():
self.aiotask_of_run_load.result()
continue

def extract_all_results(self) -> list[AquteTask[TData, TResult]]:
"""
Expand Down
29 changes: 29 additions & 0 deletions tests/aqute/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ async def test_too_many_failed_tasks_error(retry_count: int):
assert "limit reached: 2" in str(exc.value)


@pytest.mark.asyncio
async def test_too_many_failed_tasks_error_with_helper_each():
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
retry_count=0,
total_failed_tasks_limit=1,
)

with pytest.raises(AquteTooManyTasksFailedError) as exc:
async for _ in aqute.apply_to_each(range(1, 6)):
pass
assert "limit reached: 1" in str(exc.value)


@pytest.mark.asyncio
async def test_too_many_failed_tasks_error_with_helper_all():
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
retry_count=0,
total_failed_tasks_limit=1,
)

with pytest.raises(AquteTooManyTasksFailedError) as exc:
await aqute.apply_to_all(range(1, 6))
assert "limit reached: 1" in str(exc.value)


@pytest.mark.asyncio
@pytest.mark.parametrize("retry_count", [0, 1], ids=["no_retry", "one_retry"])
async def test_not_enough_failed_tasks_for_error(retry_count: int):
Expand Down

0 comments on commit 5f7e0ce

Please sign in to comment.