Skip to content

Commit

Permalink
Add wrapper for Celery().send_task to support behavior as `Task.app…
Browse files Browse the repository at this point in the history
…ly_async`
  • Loading branch information
Vlad Vladov committed Sep 16, 2023
1 parent a7b4144 commit 7d19bdd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
11 changes: 8 additions & 3 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,10 @@ def sentry_build_tracer(name, task, *args, **kwargs):
trace.build_tracer = sentry_build_tracer

from celery.app.task import Task # type: ignore
from celery import Celery # type: ignore

Task.apply_async = _wrap_apply_async(Task.apply_async)
Celery.send_task = _wrap_apply_async(Celery.send_task)

_patch_worker_exit()

Expand Down Expand Up @@ -154,9 +156,12 @@ def apply_async(*args, **kwargs):
if not propagate_traces:
return f(*args, **kwargs)

with hub.start_span(
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
) as span:
if isinstance(args[0], Task):
description = args[0].name
else:
description = args[1]

with hub.start_span(op=OP.QUEUE_SUBMIT_CELERY, description=description) as span:
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers(span))
if integration.monitor_beat_tasks:
Expand Down
14 changes: 11 additions & 3 deletions tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,12 @@ def dummy_task(self):


# TODO: This test is hanging when running test with `tox --parallel auto`. Find out why and fix it!
@pytest.mark.skip
# @pytest.mark.skip
@pytest.mark.forked
def test_redis_backend_trace_propagation(init_celery, capture_events_forksafe):
@pytest.mark.parametrize("execution_way", ["apply_async", "send_task"])
def test_redis_backend_trace_propagation(
init_celery, capture_events_forksafe, execution_way
):
celery = init_celery(traces_sample_rate=1.0, backend="redis", debug=True)

events = capture_events_forksafe()
Expand All @@ -373,7 +376,12 @@ def dummy_task(self):

with start_transaction(name="submit_celery"):
# Curious: Cannot use delay() here or py2.7-celery-4.2 crashes
res = dummy_task.apply_async()
if execution_way == "apply_async":
res = dummy_task.apply_async()
elif execution_way == "send_task":
res = celery.send_task("dummy_task")
else: # pragma: no cover
raise ValueError(execution_way)

with pytest.raises(Exception): # noqa: B017
# Celery 4.1 raises a gibberish exception
Expand Down

0 comments on commit 7d19bdd

Please sign in to comment.