-
Notifications
You must be signed in to change notification settings - Fork 34
/
flask_celery.py
181 lines (122 loc) · 4.74 KB
/
flask_celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# -*- coding: utf-8 -*-
"""
flask.ext.celery
~~~~~~~~~~~~~~~~
Celery integration for Flask.
:copyright: (c) 2010-2011 Ask Solem <[email protected]>
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
import argparse
from celery.app import App, AppPickler, current_app as current_celery
from celery.loaders import default as _default
from celery.utils import get_full_cls_name
from werkzeug import cached_property
from flask.ext import script
class FlaskLoader(_default.Loader):
def read_configuration(self):
config = self.app.flask_app.config
settings = self.setup_settings(config)
self.configured = True
return settings
class FlaskAppPickler(AppPickler):
def build_kwargs(self, flask_app, *args):
kwargs = self.build_standard_kwargs(*args)
kwargs["flask_app"] = flask_app
return kwargs
class Celery(App):
Pickler = FlaskAppPickler
flask_app = None
loader_cls = get_full_cls_name(FlaskLoader)
def __init__(self, flask_app=None, *args, **kwargs):
self.flask_app = flask_app
super(Celery, self).__init__(*args, **kwargs)
def __reduce_args__(self):
return (self.flask_app, ) + super(Celery, self).__reduce_args__()
def to_Option(option, typemap={"int": int, "float": float, "string": str}):
kwargs = vars(option)
# convert type strings to real types.
type_ = kwargs["type"]
kwargs["type"] = typemap.get(type_) or type_
# callback not supported by argparse, must use type|action instead.
cb = kwargs.pop("callback", None)
cb_args = kwargs.pop("callback_args", None) or ()
cb_kwargs = kwargs.pop("callback_kwargs", None) or {}
# action specific conversions
action = kwargs["action"]
if action == "store_true":
map(kwargs.pop, ("const", "type", "nargs", "metavar", "choices"))
elif action == "store":
kwargs.pop("nargs")
if kwargs["default"] == ("NO", "DEFAULT"):
kwargs["default"] = None
if action == "callback":
class _action_cls(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
return cb(*cb_args, **cb_kwargs)
kwargs["action"] = _action_cls
kwargs.setdefault("nargs", 0)
args = kwargs.pop("_short_opts") + kwargs.pop("_long_opts")
return script.Option(*args, **kwargs)
class Command(script.Command):
def __init__(self, app):
self.app = app
super(Command, self).__init__()
class celeryd(Command):
"""Runs a Celery worker node."""
def get_options(self):
return filter(None, map(to_Option, self.worker.get_options()))
def run(self, **kwargs):
for arg_name, arg_value in kwargs.items():
if isinstance(arg_value, list) and arg_value:
kwargs[arg_name] = arg_value[0]
self.worker.run(**kwargs)
@cached_property
def worker(self):
from celery.bin.celeryd import WorkerCommand
return WorkerCommand(app=current_celery())
class celerybeat(Command):
"""Runs the Celery periodic task scheduler."""
def get_options(self):
return filter(None, map(to_Option, self.beat.get_options()))
def run(self, **kwargs):
self.beat.run(**kwargs)
@cached_property
def beat(self):
from celery.bin.celerybeat import BeatCommand
return BeatCommand(app=current_celery())
class celeryev(Command):
"""Runs the Celery curses event monitor."""
command = None
def get_options(self):
return filter(None, map(to_Option, self.ev.get_options()))
def run(self, **kwargs):
self.ev.run(**kwargs)
@cached_property
def ev(self):
from celery.bin.celeryev import EvCommand
return EvCommand(app=current_celery())
class celeryctl(Command):
def get_options(self):
return ()
def handle(self, app, prog, name, remaining_args):
if not remaining_args:
remaining_args = ["help"]
from celery.bin.celeryctl import celeryctl as ctl
ctl(current_celery()).execute_from_commandline(
["%s celeryctl" % prog] + remaining_args)
class camqadm(Command):
"""Runs the Celery AMQP admin shell/utility."""
def get_options(self):
return ()
def handle(self, app, prog, name, remaining_args):
from celery.bin.camqadm import AMQPAdminCommand
return AMQPAdminCommand(app=current_celery()).run(*remaining_args)
commands = {"celeryd": celeryd,
"celerybeat": celerybeat,
"celeryev": celeryev,
"celeryctl": celeryctl,
"camqadm": camqadm}
def install_commands(manager):
for name, command in commands.items():
manager.add_command(name, command(manager.app))