Skip to content

Commit

Permalink
Auto monitoring beat update (#1989)
Browse files Browse the repository at this point in the history
- Small update to support Celery 4 and 5
- Changed the name of the schedule shelf file that we patch to have the suffix `-patched-by-sentry-sdk` instead of `.new` so in case there is an error with this new shelf file somewhere the users know that it is patched by the sentry sdk.
- Additionally some minor tweaks to make code more readable
  • Loading branch information
antonpirker authored Apr 5, 2023
1 parent d8a5a43 commit baf909d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 25 deletions.
30 changes: 15 additions & 15 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
import shutil
import functools
import tempfile

from sentry_sdk.consts import OP
from sentry_sdk._compat import reraise
Expand Down Expand Up @@ -320,6 +321,11 @@ def sentry_workloop(*args, **kwargs):
def _get_headers(task):
# type: (Task) -> Dict[str, Any]
headers = task.request.get("headers") or {}

if "headers" in headers:
headers.update(headers["headers"])
del headers["headers"]

return headers


Expand Down Expand Up @@ -392,9 +398,11 @@ def _reinstall_patched_tasks(app, sender, add_updated_periodic_tasks):
add_updated_periodic_task()

# Start Celery Beat (with new (cloned) schedule, because old one is still in use)
new_schedule_filename = sender.schedule_filename + ".new"
shutil.copy2(sender.schedule_filename, new_schedule_filename)
app.Beat(schedule=new_schedule_filename).run()
cloned_schedule = tempfile.NamedTemporaryFile(suffix="-patched-by-sentry-sdk")
with open(sender.schedule_filename, "rb") as original_schedule:
shutil.copyfileobj(original_schedule, cloned_schedule)

app.Beat(schedule=cloned_schedule.name).run()


# Nested functions do not work as Celery hook receiver,
Expand Down Expand Up @@ -480,9 +488,7 @@ def crons_task_before_run(sender, **kwargs):
if "sentry-monitor-slug" not in headers:
return

monitor_config = (
headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {}
)
monitor_config = headers.get("sentry-monitor-config", {})

start_timestamp_s = now()

Expand All @@ -506,9 +512,7 @@ def crons_task_success(sender, **kwargs):
if "sentry-monitor-slug" not in headers:
return

monitor_config = (
headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {}
)
monitor_config = headers.get("sentry-monitor-config", {})

start_timestamp_s = headers["sentry-monitor-start-timestamp-s"]

Expand All @@ -529,9 +533,7 @@ def crons_task_failure(sender, **kwargs):
if "sentry-monitor-slug" not in headers:
return

monitor_config = (
headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {}
)
monitor_config = headers.get("sentry-monitor-config", {})

start_timestamp_s = headers["sentry-monitor-start-timestamp-s"]

Expand All @@ -552,9 +554,7 @@ def crons_task_retry(sender, **kwargs):
if "sentry-monitor-slug" not in headers:
return

monitor_config = (
headers["sentry-monitor-config"] if "sentry-monitor-config" in headers else {}
)
monitor_config = headers.get("sentry-monitor-config", {})

start_timestamp_s = headers["sentry-monitor-start-timestamp-s"]

Expand Down
39 changes: 29 additions & 10 deletions tests/integrations/celery/test_celery_beat_crons.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import tempfile
import mock

import pytest
Expand Down Expand Up @@ -37,6 +38,20 @@ def test_get_headers():

assert _get_headers(fake_task) == {"bla": "blub"}

fake_task.request.update(
{
"headers": {
"headers": {
"tri": "blub",
"bar": "baz",
},
"bla": "blub",
},
}
)

assert _get_headers(fake_task) == {"bla": "blub", "tri": "blub", "bar": "baz"}


@pytest.mark.parametrize(
"seconds, expected_tuple",
Expand Down Expand Up @@ -273,16 +288,20 @@ def test_reinstall_patched_tasks():

add_updated_periodic_tasks = [mock.MagicMock(), mock.MagicMock(), mock.MagicMock()]

with mock.patch("sentry_sdk.integrations.celery.shutil.copy2") as mock_copy2:
_reinstall_patched_tasks(app, sender, add_updated_periodic_tasks)
mock_open = mock.Mock(return_value=tempfile.NamedTemporaryFile())

sender.stop.assert_called_once_with()
with mock.patch("sentry_sdk.integrations.celery.open", mock_open):
with mock.patch(
"sentry_sdk.integrations.celery.shutil.copyfileobj"
) as mock_copyfileobj:
_reinstall_patched_tasks(app, sender, add_updated_periodic_tasks)

add_updated_periodic_tasks[0].assert_called_once_with()
add_updated_periodic_tasks[1].assert_called_once_with()
add_updated_periodic_tasks[2].assert_called_once_with()
sender.stop.assert_called_once_with()

mock_copy2.assert_called_once_with(
"test_schedule_filename", "test_schedule_filename.new"
)
fake_beat.run.assert_called_once_with()
add_updated_periodic_tasks[0].assert_called_once_with()
add_updated_periodic_tasks[1].assert_called_once_with()
add_updated_periodic_tasks[2].assert_called_once_with()

mock_copyfileobj.assert_called_once()

fake_beat.run.assert_called_once_with()

0 comments on commit baf909d

Please sign in to comment.