From f3b3f65a3ca3f2f6141dfe8bc09c019c5cc6a8cb Mon Sep 17 00:00:00 2001 From: Evgeny Seregin Date: Wed, 22 Feb 2023 18:04:08 +0300 Subject: [PATCH] feat(arq): add arq integration (#1872) Initial integration for arq --- .github/workflows/test-integration-arq.yml | 73 ++++++++ mypy.ini | 2 + sentry_sdk/consts.py | 2 + sentry_sdk/integrations/arq.py | 203 +++++++++++++++++++++ setup.py | 1 + tests/integrations/arq/__init__.py | 3 + tests/integrations/arq/test_arq.py | 159 ++++++++++++++++ tox.ini | 9 + 8 files changed, 452 insertions(+) create mode 100644 .github/workflows/test-integration-arq.yml create mode 100644 sentry_sdk/integrations/arq.py create mode 100644 tests/integrations/arq/__init__.py create mode 100644 tests/integrations/arq/test_arq.py diff --git a/.github/workflows/test-integration-arq.yml b/.github/workflows/test-integration-arq.yml new file mode 100644 index 0000000000..2eee836bc1 --- /dev/null +++ b/.github/workflows/test-integration-arq.yml @@ -0,0 +1,73 @@ +name: Test arq + +on: + push: + branches: + - master + - release/** + + pull_request: + +# Cancel in progress workflows on pull_requests. +# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} + cancel-in-progress: true + +permissions: + contents: read + +env: + BUILD_CACHE_KEY: ${{ github.sha }} + CACHED_BUILD_PATHS: | + ${{ github.workspace }}/dist-serverless + +jobs: + test: + name: arq, python ${{ matrix.python-version }}, ${{ matrix.os }} + runs-on: ${{ matrix.os }} + timeout-minutes: 45 + + strategy: + fail-fast: false + matrix: + python-version: ["3.7","3.8","3.9","3.10","3.11"] + # python3.6 reached EOL and is no longer being supported on + # new versions of hosted runners on Github Actions + # ubuntu-20.04 is the last version that supported python3.6 + # see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877 + os: [ubuntu-20.04] + + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Setup Test Env + run: | + pip install codecov "tox>=3,<4" + + - name: Test arq + timeout-minutes: 45 + shell: bash + run: | + set -x # print commands that are executed + coverage erase + + ./scripts/runtox.sh "${{ matrix.python-version }}-arq" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch + coverage combine .coverage* + coverage xml -i + codecov --file coverage.xml + + check_required_tests: + name: All arq tests passed or skipped + needs: test + # Always run this, even if a dependent job failed + if: always() + runs-on: ubuntu-20.04 + steps: + - name: Check for failures + if: contains(needs.test.result, 'failure') + run: | + echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1 diff --git a/mypy.ini b/mypy.ini index 6e8f6b7230..0d12e43280 100644 --- a/mypy.ini +++ b/mypy.ini @@ -65,3 +65,5 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-huey.*] ignore_missing_imports = True +[mypy-arq.*] +ignore_missing_imports = True diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 2d2b28b9ee..d5c9b19a45 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -65,6 +65,8 @@ class OP: MIDDLEWARE_STARLITE = "middleware.starlite" MIDDLEWARE_STARLITE_RECEIVE = "middleware.starlite.receive" MIDDLEWARE_STARLITE_SEND = "middleware.starlite.send" + QUEUE_SUBMIT_ARQ = "queue.submit.arq" + QUEUE_TASK_ARQ = "queue.task.arq" QUEUE_SUBMIT_CELERY = "queue.submit.celery" QUEUE_TASK_CELERY = "queue.task.celery" QUEUE_TASK_RQ = "queue.task.rq" diff --git a/sentry_sdk/integrations/arq.py b/sentry_sdk/integrations/arq.py new file mode 100644 index 0000000000..195272a4c7 --- /dev/null +++ b/sentry_sdk/integrations/arq.py @@ -0,0 +1,203 @@ +from __future__ import absolute_import + +import sys + +from sentry_sdk._compat import reraise +from sentry_sdk._types import MYPY +from sentry_sdk import Hub +from sentry_sdk.consts import OP +from sentry_sdk.hub import _should_send_default_pii +from sentry_sdk.integrations import DidNotEnable, Integration +from sentry_sdk.integrations.logging import ignore_logger +from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK +from sentry_sdk.utils import ( + capture_internal_exceptions, + event_from_exception, + SENSITIVE_DATA_SUBSTITUTE, +) + +try: + import arq.worker + from arq.version import VERSION as ARQ_VERSION + from arq.connections import ArqRedis + from arq.worker import JobExecutionFailed, Retry, RetryJob, Worker +except ImportError: + raise DidNotEnable("Arq is not installed") + +if MYPY: + from typing import Any, Dict, Optional + + from sentry_sdk._types import EventProcessor, Event, ExcInfo, Hint + + from arq.jobs import Job + from arq.typing import WorkerCoroutine + from arq.worker import Function + +ARQ_CONTROL_FLOW_EXCEPTIONS = (JobExecutionFailed, Retry, RetryJob) + + +class ArqIntegration(Integration): + identifier = "arq" + + @staticmethod + def setup_once(): + # type: () -> None + + try: + if isinstance(ARQ_VERSION, str): + version = tuple(map(int, ARQ_VERSION.split(".")[:2])) + else: + version = ARQ_VERSION.version[:2] + except (TypeError, ValueError): + raise DidNotEnable("arq version unparsable: {}".format(ARQ_VERSION)) + + if version < (0, 23): + raise DidNotEnable("arq 0.23 or newer required.") + + patch_enqueue_job() + patch_run_job() + patch_func() + + ignore_logger("arq.worker") + + +def patch_enqueue_job(): + # type: () -> None + old_enqueue_job = ArqRedis.enqueue_job + + async def _sentry_enqueue_job(self, function, *args, **kwargs): + # type: (ArqRedis, str, *Any, **Any) -> Optional[Job] + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return await old_enqueue_job(self, function, *args, **kwargs) + + with hub.start_span(op=OP.QUEUE_SUBMIT_ARQ, description=function): + return await old_enqueue_job(self, function, *args, **kwargs) + + ArqRedis.enqueue_job = _sentry_enqueue_job + + +def patch_run_job(): + # type: () -> None + old_run_job = Worker.run_job + + async def _sentry_run_job(self, job_id, score): + # type: (Worker, str, int) -> None + hub = Hub(Hub.current) + + if hub.get_integration(ArqIntegration) is None: + return await old_run_job(self, job_id, score) + + with hub.push_scope() as scope: + scope._name = "arq" + scope.clear_breadcrumbs() + + transaction = Transaction( + name="unknown arq task", + status="ok", + op=OP.QUEUE_TASK_ARQ, + source=TRANSACTION_SOURCE_TASK, + ) + + with hub.start_transaction(transaction): + return await old_run_job(self, job_id, score) + + Worker.run_job = _sentry_run_job + + +def _capture_exception(exc_info): + # type: (ExcInfo) -> None + hub = Hub.current + + if hub.scope.transaction is not None: + if exc_info[0] in ARQ_CONTROL_FLOW_EXCEPTIONS: + hub.scope.transaction.set_status("aborted") + return + + hub.scope.transaction.set_status("internal_error") + + event, hint = event_from_exception( + exc_info, + client_options=hub.client.options if hub.client else None, + mechanism={"type": ArqIntegration.identifier, "handled": False}, + ) + hub.capture_event(event, hint=hint) + + +def _make_event_processor(ctx, *args, **kwargs): + # type: (Dict[Any, Any], *Any, **Any) -> EventProcessor + def event_processor(event, hint): + # type: (Event, Hint) -> Optional[Event] + + hub = Hub.current + + with capture_internal_exceptions(): + if hub.scope.transaction is not None: + hub.scope.transaction.name = ctx["job_name"] + event["transaction"] = ctx["job_name"] + + tags = event.setdefault("tags", {}) + tags["arq_task_id"] = ctx["job_id"] + tags["arq_task_retry"] = ctx["job_try"] > 1 + extra = event.setdefault("extra", {}) + extra["arq-job"] = { + "task": ctx["job_name"], + "args": args + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "kwargs": kwargs + if _should_send_default_pii() + else SENSITIVE_DATA_SUBSTITUTE, + "retry": ctx["job_try"], + } + + return event + + return event_processor + + +def _wrap_coroutine(name, coroutine): + # type: (str, WorkerCoroutine) -> WorkerCoroutine + async def _sentry_coroutine(ctx, *args, **kwargs): + # type: (Dict[Any, Any], *Any, **Any) -> Any + hub = Hub.current + if hub.get_integration(ArqIntegration) is None: + return await coroutine(*args, **kwargs) + + hub.scope.add_event_processor( + _make_event_processor({**ctx, "job_name": name}, *args, **kwargs) + ) + + try: + result = await coroutine(ctx, *args, **kwargs) + except Exception: + exc_info = sys.exc_info() + _capture_exception(exc_info) + reraise(*exc_info) + + return result + + return _sentry_coroutine + + +def patch_func(): + # type: () -> None + old_func = arq.worker.func + + def _sentry_func(*args, **kwargs): + # type: (*Any, **Any) -> Function + hub = Hub.current + + if hub.get_integration(ArqIntegration) is None: + return old_func(*args, **kwargs) + + func = old_func(*args, **kwargs) + + if not getattr(func, "_sentry_is_patched", False): + func.coroutine = _wrap_coroutine(func.name, func.coroutine) + func._sentry_is_patched = True + + return func + + arq.worker.func = _sentry_func diff --git a/setup.py b/setup.py index 07756acabc..3a96380a11 100644 --- a/setup.py +++ b/setup.py @@ -53,6 +53,7 @@ def get_file_text(file_name): "celery": ["celery>=3"], "huey": ["huey>=2"], "beam": ["apache-beam>=2.12"], + "arq": ["arq>=0.23"], "rq": ["rq>=0.6"], "aiohttp": ["aiohttp>=3.5"], "tornado": ["tornado>=5"], diff --git a/tests/integrations/arq/__init__.py b/tests/integrations/arq/__init__.py new file mode 100644 index 0000000000..f0b4712255 --- /dev/null +++ b/tests/integrations/arq/__init__.py @@ -0,0 +1,3 @@ +import pytest + +pytest.importorskip("arq") diff --git a/tests/integrations/arq/test_arq.py b/tests/integrations/arq/test_arq.py new file mode 100644 index 0000000000..d7e0e8af85 --- /dev/null +++ b/tests/integrations/arq/test_arq.py @@ -0,0 +1,159 @@ +import pytest + +from sentry_sdk import start_transaction +from sentry_sdk.integrations.arq import ArqIntegration + +from arq.connections import ArqRedis +from arq.jobs import Job +from arq.utils import timestamp_ms +from arq.worker import Retry, Worker + +from fakeredis.aioredis import FakeRedis + + +@pytest.fixture(autouse=True) +def patch_fakeredis_info_command(): + from fakeredis._fakesocket import FakeSocket + + if not hasattr(FakeSocket, "info"): + from fakeredis._commands import command + from fakeredis._helpers import SimpleString + + @command((SimpleString,), name="info") + def info(self, section): + return section + + FakeSocket.info = info + + +@pytest.fixture +def init_arq(sentry_init): + def inner(functions, allow_abort_jobs=False): + sentry_init( + integrations=[ArqIntegration()], + traces_sample_rate=1.0, + send_default_pii=True, + debug=True, + ) + + server = FakeRedis() + pool = ArqRedis(pool_or_conn=server.connection_pool) + return pool, Worker( + functions, redis_pool=pool, allow_abort_jobs=allow_abort_jobs + ) + + return inner + + +@pytest.mark.asyncio +async def test_job_result(init_arq): + async def increase(ctx, num): + return num + 1 + + increase.__qualname__ = increase.__name__ + + pool, worker = init_arq([increase]) + + job = await pool.enqueue_job("increase", 3) + + assert isinstance(job, Job) + + await worker.run_job(job.job_id, timestamp_ms()) + result = await job.result() + job_result = await job.result_info() + + assert result == 4 + assert job_result.result == 4 + + +@pytest.mark.asyncio +async def test_job_retry(capture_events, init_arq): + async def retry_job(ctx): + if ctx["job_try"] < 2: + raise Retry + + retry_job.__qualname__ = retry_job.__name__ + + pool, worker = init_arq([retry_job]) + + job = await pool.enqueue_job("retry_job") + + events = capture_events() + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "aborted" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 1 + + await worker.run_job(job.job_id, timestamp_ms()) + + event = events.pop(0) + assert event["contexts"]["trace"]["status"] == "ok" + assert event["transaction"] == "retry_job" + assert event["tags"]["arq_task_id"] == job.job_id + assert event["extra"]["arq-job"]["retry"] == 2 + + +@pytest.mark.parametrize("job_fails", [True, False], ids=["error", "success"]) +@pytest.mark.asyncio +async def test_job_transaction(capture_events, init_arq, job_fails): + async def division(_, a, b=0): + return a / b + + division.__qualname__ = division.__name__ + + pool, worker = init_arq([division]) + + events = capture_events() + + job = await pool.enqueue_job("division", 1, b=int(not job_fails)) + await worker.run_job(job.job_id, timestamp_ms()) + + if job_fails: + error_event = events.pop(0) + assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError" + assert error_event["exception"]["values"][0]["mechanism"]["type"] == "arq" + + (event,) = events + assert event["type"] == "transaction" + assert event["transaction"] == "division" + assert event["transaction_info"] == {"source": "task"} + + if job_fails: + assert event["contexts"]["trace"]["status"] == "internal_error" + else: + assert event["contexts"]["trace"]["status"] == "ok" + + assert "arq_task_id" in event["tags"] + assert "arq_task_retry" in event["tags"] + + extra = event["extra"]["arq-job"] + assert extra["task"] == "division" + assert extra["args"] == [1] + assert extra["kwargs"] == {"b": int(not job_fails)} + assert extra["retry"] == 1 + + +@pytest.mark.asyncio +async def test_enqueue_job(capture_events, init_arq): + async def dummy_job(_): + pass + + pool, _ = init_arq([dummy_job]) + + events = capture_events() + + with start_transaction() as transaction: + await pool.enqueue_job("dummy_job") + + (event,) = events + + assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id + assert event["contexts"]["trace"]["span_id"] == transaction.span_id + + assert len(event["spans"]) + assert event["spans"][0]["op"] == "queue.submit.arq" + assert event["spans"][0]["description"] == "dummy_job" diff --git a/tox.ini b/tox.ini index 55af0dfd8c..8712769031 100644 --- a/tox.ini +++ b/tox.ini @@ -22,6 +22,9 @@ envlist = {py3.7}-aiohttp-v{3.5} {py3.7,py3.8,py3.9,py3.10,py3.11}-aiohttp-v{3.6} + # Arq + {py3.7,py3.8,py3.9,py3.10,py3.11}-arq + # Asgi {py3.7,py3.8,py3.9,py3.10,py3.11}-asgi @@ -175,6 +178,11 @@ deps = aiohttp-v3.5: aiohttp>=3.5.0,<3.6.0 aiohttp: pytest-aiohttp + # Arq + arq: arq>=0.23.0 + arq: fakeredis>=2.2.0 + arq: pytest-asyncio + # Asgi asgi: pytest-asyncio asgi: async-asgi-testclient @@ -400,6 +408,7 @@ setenv = PYTHONDONTWRITEBYTECODE=1 TESTPATH=tests aiohttp: TESTPATH=tests/integrations/aiohttp + arq: TESTPATH=tests/integrations/arq asgi: TESTPATH=tests/integrations/asgi aws_lambda: TESTPATH=tests/integrations/aws_lambda beam: TESTPATH=tests/integrations/beam