diff --git a/sentry_sdk/integrations/celery.py b/sentry_sdk/integrations/celery.py index 7aa4e3673a..122aba1156 100644 --- a/sentry_sdk/integrations/celery.py +++ b/sentry_sdk/integrations/celery.py @@ -110,8 +110,8 @@ def sentry_build_tracer(name, task, *args, **kwargs): from celery.app.task import Task # type: ignore from celery import Celery # type: ignore - Task.apply_async = _wrap_apply_async(Task.apply_async) - Celery.send_task = _wrap_apply_async(Celery.send_task) + Task.apply_async = _wrap_task_run(Task.apply_async) + Celery.send_task = _wrap_task_run(Celery.send_task) _patch_worker_exit() @@ -146,7 +146,7 @@ def __exit__(self, exc_type, exc_value, traceback): return None -def _wrap_apply_async(f): +def _wrap_task_run(f): # type: (F) -> F @wraps(f) def apply_async(*args, **kwargs): @@ -172,10 +172,13 @@ def apply_async(*args, **kwargs): except (IndexError, TypeError): task_started_from_beat = False - task = args[0] + if isinstance(args[0], Task): + task_name = args[0].name + else: + task_name = args[1] span_mgr = ( - hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task.name) + hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=task_name) if not task_started_from_beat else NoOpMgr() ) # type: Union[Span, NoOpMgr] diff --git a/tests/integrations/celery/test_celery.py b/tests/integrations/celery/test_celery.py index bcc857ba7d..9a2cfb1e41 100644 --- a/tests/integrations/celery/test_celery.py +++ b/tests/integrations/celery/test_celery.py @@ -6,7 +6,7 @@ from sentry_sdk.integrations.celery import ( CeleryIntegration, _get_headers, - _wrap_apply_async, + _wrap_task_run, ) from sentry_sdk._compat import text_type @@ -34,7 +34,7 @@ def init_celery(sentry_init, request): def inner(propagate_traces=True, backend="always_eager", **kwargs): sentry_init( integrations=[CeleryIntegration(propagate_traces=propagate_traces)], - **kwargs + **kwargs, ) celery = Celery(__name__) @@ -372,16 +372,16 @@ def test_redis_backend_trace_propagation( runs = [] @celery.task(name="dummy_task", bind=True) - def dummy_task(self): + def dummy_task(self, x, y): runs.append(1) 1 / 0 with start_transaction(name="submit_celery"): # Curious: Cannot use delay() here or py2.7-celery-4.2 crashes if execution_way == "apply_async": - res = dummy_task.apply_async() + res = dummy_task.apply_async(kwargs={"x": 1, "y": 0}) elif execution_way == "send_task": - res = celery.send_task("dummy_task") + res = celery.send_task("dummy_task", kwargs={"x": 1, "y": 0}) else: # pragma: no cover raise ValueError(execution_way) @@ -579,7 +579,7 @@ def dummy_function(*args, **kwargs): assert "sentry-trace" in headers assert "baggage" in headers - wrapped = _wrap_apply_async(dummy_function) + wrapped = _wrap_task_run(dummy_function) wrapped(mock.MagicMock(), (), headers={}) @@ -593,7 +593,7 @@ def dummy_function(*args, **kwargs): assert "sentry-trace" not in headers assert "baggage" not in headers - wrapped = _wrap_apply_async(dummy_function) + wrapped = _wrap_task_run(dummy_function) wrapped( mock.MagicMock(), [