Skip to content

Commit

Permalink
Support for async concurrent tasks on the same worker process
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhamlett committed Nov 12, 2024
1 parent 9e1ce98 commit c654d77
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 85 deletions.
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ wakaq = WakaQ(
# the current machine.
concurrency="cores*4",

# Number of concurrent asyncio tasks per worker process. Must be an int or
# str which evaluates to an int. The variable "cores" is replaced with the
# number of processors on the current machine. Default is zero for no limit.
async_concurrency=50,

# Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per
# task or queue. If no soft timeout set, tasks can run forever.
soft_timeout=30, # seconds
Expand Down Expand Up @@ -104,17 +109,23 @@ def mytask(x, y):


@wakaq.task
def anothertask():
def a_cpu_intensive_task():
print("hello world")


@wakaq.task
async def an_io_intensive_task():
print("hello world")


@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
def inner(*args, **kwargs):
# do something before each task runs
fn(*args, **kwargs)
# do something after each task runs
return inner
def custom_task_decorator(fn, args, kwargs):
# do something before each task runs, for ex: `with app.app_context():`
if inspect.iscoroutinefunction(fn):
await fn(*payload["args"], **payload["kwargs"])
else:
fn(*payload["args"], **payload["kwargs"])
# do something after each task runs


if __name__ == '__main__':
Expand All @@ -130,6 +141,9 @@ if __name__ == '__main__':

# print hello world on a worker somewhere, after 10 seconds from now
anothertask.delay(eta=timedelta(seconds=10))

# print hello world on a worker concurrently, even if you only have 1 worker process
an_io_intensive_task.delay()
```

## Deploying
Expand Down
7 changes: 5 additions & 2 deletions wakaq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class WakaQ:
soft_timeout = None
hard_timeout = None
concurrency = 0
async_concurrency = 0
schedules = []
exclude_queues = []
max_retries = None
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
password=None,
db=0,
concurrency=0,
async_concurrency=0,
exclude_queues=[],
max_retries=None,
soft_timeout=None,
Expand Down Expand Up @@ -82,6 +84,7 @@ def __init__(
self.broker_keys = [x.broker_key for x in self.queues if x.name not in self.exclude_queues]
self.schedules = [CronTask.create(x) for x in schedules]
self.concurrency = self._format_concurrency(concurrency)
self.async_concurrency = self._format_concurrency(async_concurrency, is_async=True)
self.soft_timeout = soft_timeout.total_seconds() if isinstance(soft_timeout, timedelta) else soft_timeout
self.hard_timeout = hard_timeout.total_seconds() if isinstance(hard_timeout, timedelta) else hard_timeout
self.wait_timeout = wait_timeout.total_seconds() if isinstance(wait_timeout, timedelta) else wait_timeout
Expand Down Expand Up @@ -245,10 +248,10 @@ def _default_priority(self, queue, lowest_priority):
queue.priority = lowest_priority + 1
return queue

def _format_concurrency(self, concurrency):
def _format_concurrency(self, concurrency, is_async=None):
if not concurrency:
return 0
try:
return int(safe_eval(str(concurrency), {"cores": multiprocessing.cpu_count()}))
except Exception as e:
raise Exception(f"Error parsing concurrency: {e}")
raise Exception(f"Error parsing {'async_' if is_async else ''}concurrency: {e}")
20 changes: 20 additions & 0 deletions wakaq/task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import asyncio
import inspect
import threading
from datetime import timedelta

from .queue import Queue
Expand Down Expand Up @@ -43,13 +46,30 @@ def _delay(self, *args, **kwargs):
eta = kwargs.pop("eta", None)

if self.wakaq.synchronous_mode:
if inspect.iscoroutinefunction(self.fn):
loop = self.get_event_loop()
coroutine = self.fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coroutine, loop).result()
return self.fn(*args, **kwargs)

if eta:
self.wakaq._enqueue_with_eta(self.name, queue, args, kwargs, eta)
else:
self.wakaq._enqueue_at_end(self.name, queue, args, kwargs)

def get_event_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

if not loop.is_running():
thread = threading.Thread(target=loop.run_forever, daemon=True)
thread.start()

return loop

def _broadcast(self, *args, **kwargs) -> int:
"""Run task in the background on all workers.
Expand Down
6 changes: 3 additions & 3 deletions wakaq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


def import_app(app):
""" Import and return the WakaQ instance from the specified module path."""
"""Import and return the WakaQ instance from the specified module path."""

cwd = os.getcwd()
if cwd not in sys.path:
Expand All @@ -29,7 +29,7 @@ def import_app(app):


def inspect(app):
""" Return the queues and their respective pending task counts, and the number of workers connected."""
"""Return the queues and their respective pending task counts, and the number of workers connected."""

queues = {}
for queue in app.queues:
Expand Down Expand Up @@ -267,5 +267,5 @@ def exception_in_chain(e, exception_type):
while (e.__cause__ or e.__context__) is not None:
if isinstance((e.__cause__ or e.__context__), exception_type):
return True
e = (e.__cause__ or e.__context__)
e = e.__cause__ or e.__context__
return False
Loading

0 comments on commit c654d77

Please sign in to comment.