Skip to content

Commit fddb8be

Browse files
committed
Allow to schedule a task using its function
Using the only the task name when scheduling a task is limiting as it makes refactoring less obvious than using the function directly. However having the possibility to use the name for scheduling is still important when importing the function is not possible.
1 parent ee7c53b commit fddb8be

File tree

8 files changed

+106
-36
lines changed

8 files changed

+106
-36
lines changed

README.rst

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Create task and schedule two jobs, one executed now and one later:
4444
4545
4646
# Schedule a job to be executed ASAP
47-
spin.schedule('compute', 5, 3)
47+
spin.schedule(compute, 5, 3)
4848
4949
print('Starting workers, ^C to quit')
5050
spin.start_workers()

doc/index.rst

+18
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,24 @@ Quickstart
2323

2424
.. literalinclude:: ../examples/quickstart.py
2525

26+
The :class:`Engine` is the central part of Spinach, it allows to define tasks,
27+
schedule jobs to execute in the background and start background workers.
28+
:ref:`More details <engine>`.
29+
30+
The Broker is the backend that background workers use to retrieve jobs to
31+
execute. Spinach provides two brokers: MemoryBroker for development and
32+
RedisBroker for production.
33+
34+
The :meth:`Engine.task` decorator is used to register tasks. It requires at
35+
least a `name` to identify the task, but other options can be given to
36+
customize how the task behaves. :ref:`More details <tasks>`.
37+
38+
Background jobs can then be scheduled by using either the task name or the task
39+
function::
40+
41+
spin.schedule('compute', 5, 3) # identify a task by its name
42+
spin.schedule(compute, 5, 3) # identify a task by its function
43+
2644
Getting started with spinach:
2745

2846
.. toctree::

examples/queues.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ def slow():
2121
time.sleep(10)
2222

2323

24-
spin.schedule('slow')
25-
spin.schedule('fast')
24+
spin.schedule(slow)
25+
spin.schedule(fast)
2626

2727
spin.start_workers(number=1, queue='high-priority', stop_when_queue_empty=True)

examples/quickstart.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def compute(a, b):
99

1010

1111
# Schedule a job to be executed ASAP
12-
spin.schedule('compute', 5, 3)
12+
spin.schedule(compute, 5, 3)
1313

1414
print('Starting workers, ^C to quit')
1515
spin.start_workers()

spinach/contrib/spinachd/mail.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from django.core.mail import EmailMessage
77
from django.core.mail.backends.base import BaseEmailBackend
88

9-
from .tasks import tasks
9+
from .tasks import tasks, send_emails
1010

1111

1212
class BackgroundEmailBackend(BaseEmailBackend):
@@ -17,7 +17,7 @@ def send_messages(self, messages):
1717
message.message() # .message() triggers header validation
1818
msg_count += 1
1919
messages = serialize_email_messages(messages)
20-
tasks.schedule('spinachd:send_emails', messages)
20+
tasks.schedule(send_emails, messages)
2121

2222
return msg_count
2323

spinach/engine.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from logging import getLogger
33
import threading
44

5-
from .task import Tasks, Batch
5+
from .task import Tasks, Batch, Schedulable
66
from .utils import run_forever, handle_sigterm
77
from .job import Job, JobStatus, advance_job_status
88
from .brokers.base import Broker
@@ -58,31 +58,31 @@ def attach_tasks(self, tasks: Tasks):
5858
self._tasks.update(tasks)
5959
tasks._spin = self
6060

61-
def execute(self, task_name: str, *args, **kwargs):
62-
return self._tasks.get(task_name).func(*args, **kwargs)
61+
def execute(self, task: Schedulable, *args, **kwargs):
62+
return self._tasks.get(task).func(*args, **kwargs)
6363

64-
def schedule(self, task_name: str, *args, **kwargs):
64+
def schedule(self, task: Schedulable, *args, **kwargs):
6565
"""Schedule a job to be executed as soon as possible.
6666
67-
:arg task_name: name of the task to execute
67+
:arg task: the task or its name to execute in the background
6868
:arg args: args to be passed to the task function
6969
:arg kwargs: kwargs to be passed to the task function
7070
"""
7171
at = datetime.now(timezone.utc)
72-
return self.schedule_at(task_name, at, *args, **kwargs)
72+
return self.schedule_at(task, at, *args, **kwargs)
7373

74-
def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
74+
def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
7575
"""Schedule a job to be executed in the future.
7676
77-
:arg task_name: name of the task to execute
77+
:arg task: the task or its name to execute in the background
7878
:arg at: date at which the job should start. It is advised to pass a
7979
timezone aware datetime to lift any ambiguity. However if a
8080
timezone naive datetime if given, it will be assumed to
8181
contain UTC time.
8282
:arg args: args to be passed to the task function
8383
:arg kwargs: kwargs to be passed to the task function
8484
"""
85-
task = self._tasks.get(task_name)
85+
task = self._tasks.get(task)
8686
job = Job(task.name, task.queue, at, task.max_retries, task_args=args,
8787
task_kwargs=kwargs)
8888
return self._broker.enqueue_jobs([job])
@@ -96,8 +96,8 @@ def schedule_batch(self, batch: Batch):
9696
:arg batch: :class:`Batch` instance containing jobs to schedule
9797
"""
9898
jobs = list()
99-
for task_name, at, args, kwargs in batch.jobs_to_create:
100-
task = self._tasks.get(task_name)
99+
for task, at, args, kwargs in batch.jobs_to_create:
100+
task = self._tasks.get(task)
101101
jobs.append(
102102
Job(task.name, task.queue, at, task.max_retries,
103103
task_args=args, task_kwargs=kwargs)

spinach/task.py

+30-15
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from datetime import datetime, timezone, timedelta
22
import functools
33
import json
4-
from typing import Optional, Callable, List
4+
from typing import Optional, Callable, List, Union
55
from numbers import Number
66

77
from . import const, exc
@@ -29,6 +29,10 @@ def serialize(self):
2929
'periodicity': periodicity
3030
}, sort_keys=True)
3131

32+
@property
33+
def task_name(self):
34+
return self.name
35+
3236
def __repr__(self):
3337
return 'Task({}, {}, {}, {}, {})'.format(
3438
self.func, self.name, self.queue, self.max_retries,
@@ -45,6 +49,9 @@ def __eq__(self, other):
4549
return True
4650

4751

52+
Schedulable = Union[str, Callable, Task]
53+
54+
4855
class Tasks:
4956
"""Registry for tasks to be used by Spinach.
5057
@@ -76,8 +83,12 @@ def names(self) -> List[str]:
7683
def tasks(self) -> dict:
7784
return self._tasks
7885

79-
def get(self, name: str):
80-
task = self._tasks.get(name)
86+
def get(self, name: Schedulable) -> Task:
87+
try:
88+
task_name = name.task_name
89+
except AttributeError:
90+
task_name = name
91+
task = self._tasks.get(task_name)
8192
if task is not None:
8293
return task
8394

@@ -110,6 +121,10 @@ def task(self, func: Optional[Callable]=None, name: Optional[str]=None,
110121
self.add(func, name=name, queue=queue, max_retries=max_retries,
111122
periodicity=periodicity)
112123

124+
# Add an attribute to the function to be able to conveniently use it as
125+
# spin.schedule(function) instead of spin.schedule('task_name')
126+
func.task_name = name
127+
113128
return func
114129

115130
def add(self, func: Callable, name: Optional[str]=None,
@@ -161,23 +176,23 @@ def _require_attached_tasks(self):
161176
'a Spinach Engine.'
162177
)
163178

164-
def schedule(self, task_name: str, *args, **kwargs):
179+
def schedule(self, task: Schedulable, *args, **kwargs):
165180
"""Schedule a job to be executed as soon as possible.
166181
167-
:arg task_name: name of the task to execute in the background
182+
:arg task: the task or its name to execute in the background
168183
:arg args: args to be passed to the task function
169184
:arg kwargs: kwargs to be passed to the task function
170185
171186
This method can only be used once tasks have been attached to a
172187
Spinach :class:`Engine`.
173188
"""
174189
self._require_attached_tasks()
175-
self._spin.schedule(task_name, *args, **kwargs)
190+
self._spin.schedule(task, *args, **kwargs)
176191

177-
def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
192+
def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
178193
"""Schedule a job to be executed in the future.
179194
180-
:arg task_name: name of the task to execute in the background
195+
:arg task: the task or its name to execute in the background
181196
:arg at: Date at which the job should start. It is advised to pass a
182197
timezone aware datetime to lift any ambiguity. However if a
183198
timezone naive datetime if given, it will be assumed to
@@ -189,7 +204,7 @@ def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
189204
Spinach :class:`Engine`.
190205
"""
191206
self._require_attached_tasks()
192-
self._spin.schedule_at(task_name, at, *args, **kwargs)
207+
self._spin.schedule_at(task, at, *args, **kwargs)
193208

194209
def schedule_batch(self, batch: 'Batch'):
195210
"""Schedule many jobs at once.
@@ -225,28 +240,28 @@ class Batch:
225240
def __init__(self):
226241
self.jobs_to_create = list()
227242

228-
def schedule(self, task_name: str, *args, **kwargs):
243+
def schedule(self, task: Schedulable, *args, **kwargs):
229244
"""Add a job to be executed ASAP to the batch.
230245
231-
:arg task_name: name of the task to execute in the background
246+
:arg task: the task or its name to execute in the background
232247
:arg args: args to be passed to the task function
233248
:arg kwargs: kwargs to be passed to the task function
234249
"""
235250
at = datetime.now(timezone.utc)
236-
self.schedule_at(task_name, at, *args, **kwargs)
251+
self.schedule_at(task, at, *args, **kwargs)
237252

238-
def schedule_at(self, task_name: str, at: datetime, *args, **kwargs):
253+
def schedule_at(self, task: Schedulable, at: datetime, *args, **kwargs):
239254
"""Add a job to be executed in the future to the batch.
240255
241-
:arg task_name: name of the task to execute in the background
256+
:arg task: the task or its name to execute in the background
242257
:arg at: Date at which the job should start. It is advised to pass a
243258
timezone aware datetime to lift any ambiguity. However if a
244259
timezone naive datetime if given, it will be assumed to
245260
contain UTC time.
246261
:arg args: args to be passed to the task function
247262
:arg kwargs: kwargs to be passed to the task function
248263
"""
249-
self.jobs_to_create.append((task_name, at, args, kwargs))
264+
self.jobs_to_create.append((task, at, args, kwargs))
250265

251266

252267
class RetryException(Exception):

tests/test_task.py

+41-4
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,13 @@ def bar():
104104
pass
105105

106106
assert 'foo' in str(tasks.tasks['foo'].func)
107+
assert foo.task_name == 'foo'
107108
assert tasks.tasks['foo'].name == 'foo'
108109
assert tasks.tasks['foo'].queue == 'tasks_queue'
109110
assert tasks.tasks['foo'].max_retries == const.DEFAULT_MAX_RETRIES
110111

111112
assert 'bar' in str(tasks.tasks['bar'].func)
113+
assert bar.task_name == 'bar'
112114
assert tasks.tasks['bar'].name == 'bar'
113115
assert tasks.tasks['bar'].queue == 'task_queue'
114116
assert tasks.tasks['bar'].max_retries == 20
@@ -145,18 +147,53 @@ def test_tasks_names():
145147
assert sorted(tasks.names) == ['bar', 'foo']
146148

147149

148-
def test_tasks_get():
150+
def test_tasks_get_by_name():
149151
tasks = Tasks()
150-
with pytest.raises(exc.UnknownTask):
151-
tasks.get('foo')
152-
153152
tasks.add(print, 'foo')
153+
154154
r = tasks.get('foo')
155155
assert isinstance(r, Task)
156156
assert r.name == 'foo'
157157
assert r.func == print
158158

159159

160+
def test_tasks_get_by_function():
161+
tasks = Tasks()
162+
163+
@tasks.task(name='foo')
164+
def foo():
165+
pass
166+
167+
r = tasks.get(foo)
168+
assert isinstance(r, Task)
169+
assert r.name == 'foo'
170+
assert r.func == foo
171+
172+
173+
def test_tasks_get_by_task_object(task):
174+
tasks = Tasks()
175+
tasks._tasks[task.name] = task
176+
177+
r = tasks.get(task)
178+
assert isinstance(r, Task)
179+
assert r.name == task.name
180+
assert r.func == task.func
181+
182+
183+
def test_tasks_get_by_unknown_or_wrong_object():
184+
tasks = Tasks()
185+
with pytest.raises(exc.UnknownTask):
186+
tasks.get('foo')
187+
with pytest.raises(exc.UnknownTask):
188+
tasks.get(None)
189+
with pytest.raises(exc.UnknownTask):
190+
tasks.get(object())
191+
with pytest.raises(exc.UnknownTask):
192+
tasks.get(b'foo')
193+
with pytest.raises(exc.UnknownTask):
194+
tasks.get(RuntimeError)
195+
196+
160197
def test_tasks_scheduling(task):
161198
tasks = Tasks()
162199
tasks.add(print, 'write_to_stdout', queue='foo_queue')

0 commit comments

Comments
 (0)