diff --git a/docs/patterns/celery.rst b/docs/patterns/celery.rst index 228a04a8aa..2e9a43a731 100644 --- a/docs/patterns/celery.rst +++ b/docs/patterns/celery.rst @@ -1,105 +1,242 @@ -Celery Background Tasks -======================= +Background Tasks with Celery +============================ -If your application has a long running task, such as processing some uploaded -data or sending email, you don't want to wait for it to finish during a -request. Instead, use a task queue to send the necessary data to another -process that will run the task in the background while the request returns -immediately. +If your application has a long running task, such as processing some uploaded data or +sending email, you don't want to wait for it to finish during a request. Instead, use a +task queue to send the necessary data to another process that will run the task in the +background while the request returns immediately. + +`Celery`_ is a powerful task queue that can be used for simple background tasks as well +as complex multi-stage programs and schedules. This guide will show you how to configure +Celery using Flask. Read Celery's `First Steps with Celery`_ guide to learn how to use +Celery itself. + +.. _Celery: https://celery.readthedocs.io +.. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html + +The Flask repository contains `an example `_ +based on the information on this page, which also shows how to use JavaScript to submit +tasks and poll for progress and results. -Celery is a powerful task queue that can be used for simple background tasks -as well as complex multi-stage programs and schedules. This guide will show you -how to configure Celery using Flask, but assumes you've already read the -`First Steps with Celery `_ -guide in the Celery documentation. Install ------- -Celery is a separate Python package. Install it from PyPI using pip:: +Install Celery from PyPI, for example using pip: + +.. code-block:: text $ pip install celery -Configure ---------- -The first thing you need is a Celery instance, this is called the celery -application. It serves the same purpose as the :class:`~flask.Flask` -object in Flask, just for Celery. Since this instance is used as the -entry-point for everything you want to do in Celery, like creating tasks -and managing workers, it must be possible for other modules to import it. +Integrate Celery with Flask +--------------------------- -For instance you can place this in a ``tasks`` module. While you can use -Celery without any reconfiguration with Flask, it becomes a bit nicer by -subclassing tasks and adding support for Flask's application contexts and -hooking it up with the Flask configuration. +You can use Celery without any integration with Flask, but it's convenient to configure +it through Flask's config, and to let tasks access the Flask application. -This is all that is necessary to integrate Celery with Flask: +Celery uses similar ideas to Flask, with a ``Celery`` app object that has configuration +and registers tasks. While creating a Flask app, use the following code to create and +configure a Celery app as well. .. code-block:: python - from celery import Celery + from celery import Celery, Task - def make_celery(app): - celery = Celery(app.import_name) - celery.conf.update(app.config["CELERY_CONFIG"]) - - class ContextTask(celery.Task): - def __call__(self, *args, **kwargs): + def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: with app.app_context(): return self.run(*args, **kwargs) - celery.Task = ContextTask - return celery - -The function creates a new Celery object, configures it with the broker -from the application config, updates the rest of the Celery config from -the Flask config and then creates a subclass of the task that wraps the -task execution in an application context. + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app -.. note:: - Celery 5.x deprecated uppercase configuration keys, and 6.x will - remove them. See their official `migration guide`_. +This creates and returns a ``Celery`` app object. Celery `configuration`_ is taken from +the ``CELERY`` key in the Flask configuration. The Celery app is set as the default, so +that it is seen during each request. The ``Task`` subclass automatically runs task +functions with a Flask app context active, so that services like your database +connections are available. -.. _migration guide: https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-old-settings-map. +.. _configuration: https://celery.readthedocs.io/en/stable/userguide/configuration.html -An example task ---------------- +Here's a basic ``example.py`` that configures Celery to use Redis for communication. We +enable a result backend, but ignore results by default. This allows us to store results +only for tasks where we care about the result. -Let's write a task that adds two numbers together and returns the result. We -configure Celery's broker and backend to use Redis, create a ``celery`` -application using the factory from above, and then use it to define the task. :: +.. code-block:: python from flask import Flask - flask_app = Flask(__name__) - flask_app.config.update(CELERY_CONFIG={ - 'broker_url': 'redis://localhost:6379', - 'result_backend': 'redis://localhost:6379', - }) - celery = make_celery(flask_app) + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + celery_app = celery_init_app(app) + +Point the ``celery worker`` command at this and it will find the ``celery_app`` object. + +.. code-block:: text + + $ celery -A example worker --loglevel INFO + +You can also run the ``celery beat`` command to run tasks on a schedule. See Celery's +docs for more information about defining schedules. + +.. code-block:: text + + $ celery -A example beat --loglevel INFO + + +Application Factory +------------------- + +When using the Flask application factory pattern, call the ``celery_init_app`` function +inside the factory. It sets ``app.extensions["celery"]`` to the Celery app object, which +can be used to get the Celery app from the Flask app returned by the factory. + +.. code-block:: python + + def create_app() -> Flask: + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + app.config.from_prefixed_env() + celery_init_app(app) + return app + +To use ``celery`` commands, Celery needs an app object, but that's no longer directly +available. Create a ``make_celery.py`` file that calls the Flask app factory and gets +the Celery app from the returned Flask app. + +.. code-block:: python + + from example import create_app + + flask_app = create_app() + celery_app = flask_app.extensions["celery"] + +Point the ``celery`` command to this file. + +.. code-block:: text + + $ celery -A make_celery worker --loglevel INFO + $ celery -A make_celery beat --loglevel INFO + - @celery.task() - def add_together(a, b): +Defining Tasks +-------------- + +Using ``@celery_app.task`` to decorate task functions requires access to the +``celery_app`` object, which won't be available when using the factory pattern. It also +means that the decorated tasks are tied to the specific Flask and Celery app instances, +which could be an issue during testing if you change configuration for a test. + +Instead, use Celery's ``@shared_task`` decorator. This creates task objects that will +access whatever the "current app" is, which is a similar concept to Flask's blueprints +and app context. This is why we called ``celery_app.set_default()`` above. + +Here's an example task that adds two numbers together and returns the result. + +.. code-block:: python + + from celery import shared_task + + @shared_task(ignore_result=False) + def add_together(a: int, b: int) -> int: return a + b -This task can now be called in the background:: +Earlier, we configured Celery to ignore task results by default. Since we want to know +the return value of this task, we set ``ignore_result=False``. On the other hand, a task +that didn't need a result, such as sending an email, wouldn't set this. + + +Calling Tasks +------------- + +The decorated function becomes a task object with methods to call it in the background. +The simplest way is to use the ``delay(*args, **kwargs)`` method. See Celery's docs for +more methods. + +A Celery worker must be running to run the task. Starting a worker is shown in the +previous sections. + +.. code-block:: python + + from flask import request - result = add_together.delay(23, 42) - result.wait() # 65 + @app.post("/add") + def start_add() -> dict[str, object]: + a = request.form.get("a", type=int) + b = request.form.get("b", type=int) + result = add_together.delay(a, b) + return {"result_id": result.id} -Run a worker ------------- +The route doesn't get the task's result immediately. That would defeat the purpose by +blocking the response. Instead, we return the running task's result id, which we can use +later to get the result. -If you jumped in and already executed the above code you will be -disappointed to learn that ``.wait()`` will never actually return. -That's because you also need to run a Celery worker to receive and execute the -task. :: - $ celery -A your_application.celery worker +Getting Results +--------------- + +To fetch the result of the task we started above, we'll add another route that takes the +result id we returned before. We return whether the task is finished (ready), whether it +finished successfully, and what the return value (or error) was if it is finished. + +.. code-block:: python + + from celery.result import AsyncResult + + @app.get("/result/") + def task_result(id: str) -> dict[str, object]: + result = AsyncResult(id) + return { + "ready": result.ready(), + "successful": result.successful(), + "value": result.result if result.ready() else None, + } + +Now you can start the task using the first route, then poll for the result using the +second route. This keeps the Flask request workers from being blocked waiting for tasks +to finish. + +The Flask repository contains `an example `_ +using JavaScript to submit tasks and poll for progress and results. + + +Passing Data to Tasks +--------------------- + +The "add" task above took two integers as arguments. To pass arguments to tasks, Celery +has to serialize them to a format that it can pass to other processes. Therefore, +passing complex objects is not recommended. For example, it would be impossible to pass +a SQLAlchemy model object, since that object is probably not serializable and is tied to +the session that queried it. + +Pass the minimal amount of data necessary to fetch or recreate any complex data within +the task. Consider a task that will run when the logged in user asks for an archive of +their data. The Flask request knows the logged in user, and has the user object queried +from the database. It got that by querying the database for a given id, so the task can +do the same thing. Pass the user's id rather than the user object. + +.. code-block:: python -The ``your_application`` string has to point to your application's package -or module that creates the ``celery`` object. + @shared_task + def generate_user_archive(user_id: str) -> None: + user = db.session.get(User, user_id) + ... -Now that the worker is running, ``wait`` will return the result once the task -is finished. + generate_user_archive.delay(current_user.id) diff --git a/examples/celery/README.md b/examples/celery/README.md new file mode 100644 index 0000000000..91782019e2 --- /dev/null +++ b/examples/celery/README.md @@ -0,0 +1,27 @@ +Background Tasks with Celery +============================ + +This example shows how to configure Celery with Flask, how to set up an API for +submitting tasks and polling results, and how to use that API with JavaScript. See +[Flask's documentation about Celery](https://flask.palletsprojects.com/patterns/celery/). + +From this directory, create a virtualenv and install the application into it. Then run a +Celery worker. + +```shell +$ python3 -m venv .venv +$ . ./.venv/bin/activate +$ pip install -r requirements.txt && pip install -e . +$ celery -A make_celery worker --loglevel INFO +``` + +In a separate terminal, activate the virtualenv and run the Flask development server. + +```shell +$ . ./.venv/bin/activate +$ flask -A task_app --debug run +``` + +Go to http://localhost:5000/ and use the forms to submit tasks. You can see the polling +requests in the browser dev tools and the Flask logs. You can see the tasks submitting +and completing in the Celery logs. diff --git a/examples/celery/make_celery.py b/examples/celery/make_celery.py new file mode 100644 index 0000000000..f7d138e642 --- /dev/null +++ b/examples/celery/make_celery.py @@ -0,0 +1,4 @@ +from task_app import create_app + +flask_app = create_app() +celery_app = flask_app.extensions["celery"] diff --git a/examples/celery/pyproject.toml b/examples/celery/pyproject.toml new file mode 100644 index 0000000000..88ba6b960c --- /dev/null +++ b/examples/celery/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "flask-example-celery" +version = "1.0.0" +description = "Example Flask application with Celery background tasks." +readme = "README.md" +requires-python = ">=3.7" +dependencies = ["flask>=2.2.2", "celery[redis]>=5.2.7"] + +[build-system] +requires = ["setuptools"] +build-backend = "setuptools.build_meta" diff --git a/examples/celery/requirements.txt b/examples/celery/requirements.txt new file mode 100644 index 0000000000..b283401366 --- /dev/null +++ b/examples/celery/requirements.txt @@ -0,0 +1,56 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile pyproject.toml +# +amqp==5.1.1 + # via kombu +async-timeout==4.0.2 + # via redis +billiard==3.6.4.0 + # via celery +celery[redis]==5.2.7 + # via flask-example-celery (pyproject.toml) +click==8.1.3 + # via + # celery + # click-didyoumean + # click-plugins + # click-repl + # flask +click-didyoumean==0.3.0 + # via celery +click-plugins==1.1.1 + # via celery +click-repl==0.2.0 + # via celery +flask==2.2.2 + # via flask-example-celery (pyproject.toml) +itsdangerous==2.1.2 + # via flask +jinja2==3.1.2 + # via flask +kombu==5.2.4 + # via celery +markupsafe==2.1.2 + # via + # jinja2 + # werkzeug +prompt-toolkit==3.0.36 + # via click-repl +pytz==2022.7.1 + # via celery +redis==4.5.1 + # via celery +six==1.16.0 + # via click-repl +vine==5.0.0 + # via + # amqp + # celery + # kombu +wcwidth==0.2.6 + # via prompt-toolkit +werkzeug==2.2.2 + # via flask diff --git a/examples/celery/src/task_app/__init__.py b/examples/celery/src/task_app/__init__.py new file mode 100644 index 0000000000..dafff8aad8 --- /dev/null +++ b/examples/celery/src/task_app/__init__.py @@ -0,0 +1,39 @@ +from celery import Celery +from celery import Task +from flask import Flask +from flask import render_template + + +def create_app() -> Flask: + app = Flask(__name__) + app.config.from_mapping( + CELERY=dict( + broker_url="redis://localhost", + result_backend="redis://localhost", + task_ignore_result=True, + ), + ) + app.config.from_prefixed_env() + celery_init_app(app) + + @app.route("/") + def index() -> str: + return render_template("index.html") + + from . import views + + app.register_blueprint(views.bp) + return app + + +def celery_init_app(app: Flask) -> Celery: + class FlaskTask(Task): + def __call__(self, *args: object, **kwargs: object) -> object: + with app.app_context(): + return self.run(*args, **kwargs) + + celery_app = Celery(app.name, task_cls=FlaskTask) + celery_app.config_from_object(app.config["CELERY"]) + celery_app.set_default() + app.extensions["celery"] = celery_app + return celery_app diff --git a/examples/celery/src/task_app/tasks.py b/examples/celery/src/task_app/tasks.py new file mode 100644 index 0000000000..b6b3595d22 --- /dev/null +++ b/examples/celery/src/task_app/tasks.py @@ -0,0 +1,23 @@ +import time + +from celery import shared_task +from celery import Task + + +@shared_task(ignore_result=False) +def add(a: int, b: int) -> int: + return a + b + + +@shared_task() +def block() -> None: + time.sleep(5) + + +@shared_task(bind=True, ignore_result=False) +def process(self: Task, total: int) -> object: + for i in range(total): + self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total}) + time.sleep(1) + + return {"current": total, "total": total} diff --git a/examples/celery/src/task_app/templates/index.html b/examples/celery/src/task_app/templates/index.html new file mode 100644 index 0000000000..4e1145cb8f --- /dev/null +++ b/examples/celery/src/task_app/templates/index.html @@ -0,0 +1,108 @@ + + + + + Celery Example + + +

Celery Example

+Execute background tasks with Celery. Submits tasks and shows results using JavaScript. + +
+

Add

+

Start a task to add two numbers, then poll for the result. +

+
+
+ +
+

Result:

+ +
+

Block

+

Start a task that takes 5 seconds. However, the response will return immediately. +

+ +
+

+ +
+

Process

+

Start a task that counts, waiting one second each time, showing progress. +

+
+ +
+

+ + + + diff --git a/examples/celery/src/task_app/views.py b/examples/celery/src/task_app/views.py new file mode 100644 index 0000000000..99cf92dc20 --- /dev/null +++ b/examples/celery/src/task_app/views.py @@ -0,0 +1,38 @@ +from celery.result import AsyncResult +from flask import Blueprint +from flask import request + +from . import tasks + +bp = Blueprint("tasks", __name__, url_prefix="/tasks") + + +@bp.get("/result/") +def result(id: str) -> dict[str, object]: + result = AsyncResult(id) + ready = result.ready() + return { + "ready": ready, + "successful": result.successful() if ready else None, + "value": result.get() if ready else result.result, + } + + +@bp.post("/add") +def add() -> dict[str, object]: + a = request.form.get("a", type=int) + b = request.form.get("b", type=int) + result = tasks.add.delay(a, b) + return {"result_id": result.id} + + +@bp.post("/block") +def block() -> dict[str, object]: + result = tasks.block.delay() + return {"result_id": result.id} + + +@bp.post("/process") +def process() -> dict[str, object]: + result = tasks.process.delay(total=request.form.get("total", type=int)) + return {"result_id": result.id}