Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(crons): Move monitor dispatch code to monitors.tasks #54591

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 6 additions & 120 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, Literal, Mapping, Optional, TypedDict
from typing import Dict, Mapping, Optional

import msgpack
import sentry_sdk
Expand All @@ -15,9 +15,8 @@
from django.conf import settings
from django.db import router, transaction
from django.utils.text import slugify
from typing_extensions import NotRequired

from sentry import options, ratelimits
from sentry import ratelimits
from sentry.constants import ObjectStatus
from sentry.killswitches import killswitch_matches_context
from sentry.models import Project
Expand All @@ -32,7 +31,8 @@
MonitorLimitsExceeded,
MonitorType,
)
from sentry.monitors.tasks import check_missing, check_timeout
from sentry.monitors.tasks import try_monitor_tasks_trigger
from sentry.monitors.types import CheckinMessage, CheckinPayload, ClockPulseMessage
from sentry.monitors.utils import (
get_new_timeout_at,
get_timeout_at,
Expand All @@ -41,7 +41,7 @@
valid_duration,
)
from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator
from sentry.utils import json, metrics, redis
from sentry.utils import json, metrics
from sentry.utils.dates import to_datetime
from sentry.utils.locking import UnableToAcquireLock
from sentry.utils.locking.manager import LockManager
Expand All @@ -54,41 +54,6 @@
CHECKIN_QUOTA_LIMIT = 5
CHECKIN_QUOTA_WINDOW = 60

# This key is used to store he last timestamp that the tasks were triggered.
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"


class CheckinMessage(TypedDict):
# TODO(epurkhiser): We should make this required and ensure the message
# produced by relay includes this message type
message_type: NotRequired[Literal["check_in"]]
payload: str
start_time: float
project_id: str
sdk: str


class ClockPulseMessage(TypedDict):
message_type: Literal["clock_pulse"]


class CheckinTrace(TypedDict):
trace_id: str


class CheckinContexts(TypedDict):
trace: NotRequired[CheckinTrace]


class CheckinPayload(TypedDict):
check_in_id: str
monitor_slug: str
status: str
environment: NotRequired[str]
duration: NotRequired[int]
monitor_config: NotRequired[Dict]
contexts: NotRequired[CheckinContexts]


def _ensure_monitor_with_config(
project: Project,
Expand Down Expand Up @@ -157,85 +122,6 @@ def _ensure_monitor_with_config(
return monitor


def _dispatch_tasks(ts: datetime):
"""
Dispatch monitor tasks triggered by the consumer clock.

These tasks are triggered via the consumer processing check-ins. This
allows the monitor tasks to be synchronized to any backlog of check-ins
that are being processed.

To ensure these tasks are always triggered there is an additional celery
beat task that produces a clock pulse message into the topic that can be
used to trigger these tasks when there is a low volume of check-ins. It is
however, preferred to have a high volume of check-ins, so we do not need to
rely on celery beat, which in some cases may fail to trigger (such as in
sentry.io, when we deploy we restart the celery beat worker and it will
skip any tasks it missed)
"""
if not options.get("monitors.use_consumer_clock_task_triggers"):
return

check_missing.delay(current_datetime=ts)
check_timeout.delay(current_datetime=ts)


def _try_monitor_tasks_trigger(ts: datetime):
"""
Handles triggering the monitor tasks when we've rolled over the minute.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())

precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
if precheck_last_ts is not None:
precheck_last_ts = int(precheck_last_ts)

# If we have the same or an older reference timestamp from the most recent
# tick there is nothing to do, we've already handled this tick.
#
# The scenario where the reference_ts is older is likely due to a partition
# being slightly behind another partition that we've already read from
if precheck_last_ts is not None and precheck_last_ts >= reference_ts:
return

# GETSET is atomic. This is critical to avoid another consumer also
# processing the same tick.
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
if last_ts is not None:
last_ts = int(last_ts)

# Another consumer already handled the tick if the first LAST_TRIGGERED
# timestamp we got is different from the one we just got from the GETSET.
# Nothing needs to be done
if precheck_last_ts != last_ts:
return

# Track the delay from the true time, ideally this should be pretty
# close, but in the case of a backlog, this will be much higher
total_delay = datetime.now().timestamp() - reference_ts

logger.info(
"monitors.consumer.clock_tick",
extra={"reference_datetime": str(reference_datetime)},
)
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)

# If more than exactly a minute has passed then we've skipped a
# task run, report that to sentry, it is a problem.
if last_ts is not None and reference_ts > last_ts + 60:
with sentry_sdk.push_scope() as scope:
scope.set_extra("last_ts", last_ts)
scope.set_extra("reference_ts", reference_ts)
sentry_sdk.capture_message("Monitor task dispatch minute skipped")

_dispatch_tasks(ts)


def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
# XXX: Relay does not attach a message type, to properly discriminate the
# type we add it by default here. This can be removed once the message_type
Expand All @@ -244,7 +130,7 @@ def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage)
wrapper["message_type"] = "check_in"

try:
_try_monitor_tasks_trigger(ts)
try_monitor_tasks_trigger(ts)
except Exception:
logger.exception("Failed to trigger monitor tasks", exc_info=True)

Expand Down
89 changes: 88 additions & 1 deletion src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import logging
from datetime import datetime

import sentry_sdk
from django.conf import settings
from django.utils import timezone

from sentry import options
from sentry.constants import ObjectStatus
from sentry.silo import SiloMode
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils import metrics, redis

from .models import (
CheckInStatus,
Expand Down Expand Up @@ -41,6 +44,90 @@
# Format to use in the issue subtitle for the missed check-in timestamp
SUBTITLE_DATETIME_FORMAT = "%b %d, %I:%M %p"

# This key is used to store the last timestamp that the tasks were triggered.
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"


def _dispatch_tasks(ts: datetime):
"""
Dispatch monitor tasks triggered by the consumer clock.

These tasks are triggered via the consumer processing check-ins. This
allows the monitor tasks to be synchronized to any backlog of check-ins
that are being processed.

To ensure these tasks are always triggered there is an additional celery
beat task that produces a clock pulse message into the topic that can be
used to trigger these tasks when there is a low volume of check-ins. It is
however, preferred to have a high volume of check-ins, so we do not need to
rely on celery beat, which in some cases may fail to trigger (such as in
sentry.io, when we deploy we restart the celery beat worker and it will
skip any tasks it missed)
"""
if not options.get("monitors.use_consumer_clock_task_triggers"):
return

check_missing.delay(current_datetime=ts)
check_timeout.delay(current_datetime=ts)


def try_monitor_tasks_trigger(ts: datetime):
"""
Handles triggering the monitor tasks when we've rolled over the minute.

This function is called by our consumer processor
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())

precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
if precheck_last_ts is not None:
precheck_last_ts = int(precheck_last_ts)

# If we have the same or an older reference timestamp from the most recent
# tick there is nothing to do, we've already handled this tick.
#
# The scenario where the reference_ts is older is likely due to a partition
# being slightly behind another partition that we've already read from
if precheck_last_ts is not None and precheck_last_ts >= reference_ts:
return

# GETSET is atomic. This is critical to avoid another consumer also
# processing the same tick.
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
if last_ts is not None:
last_ts = int(last_ts)

# Another consumer already handled the tick if the first LAST_TRIGGERED
# timestamp we got is different from the one we just got from the GETSET.
# Nothing needs to be done
if precheck_last_ts != last_ts:
return

# Track the delay from the true time, ideally this should be pretty
# close, but in the case of a backlog, this will be much higher
total_delay = datetime.now().timestamp() - reference_ts

logger.info(
"monitors.consumer.clock_tick",
extra={"reference_datetime": str(reference_datetime)},
)
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)

# If more than exactly a minute has passed then we've skipped a
# task run, report that to sentry, it is a problem.
if last_ts is not None and reference_ts > last_ts + 60:
with sentry_sdk.push_scope() as scope:
scope.set_extra("last_ts", last_ts)
scope.set_extra("reference_ts", reference_ts)
sentry_sdk.capture_message("Monitor task dispatch minute skipped")

_dispatch_tasks(ts)


@instrumented_task(name="sentry.monitors.tasks.temp_task_dispatcher", silo_mode=SiloMode.REGION)
def temp_task_dispatcher():
Expand Down
35 changes: 35 additions & 0 deletions src/sentry/monitors/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from typing import Dict, Literal, TypedDict

from typing_extensions import NotRequired


class CheckinMessage(TypedDict):
# TODO(epurkhiser): We should make this required and ensure the message
# produced by relay includes this message type
message_type: NotRequired[Literal["check_in"]]
payload: str
start_time: float
project_id: str
sdk: str


class ClockPulseMessage(TypedDict):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be using this type in both the tasks and consumer, don't want an import cycle so it moves here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't see it in the tasks yet. future work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep #54647

message_type: Literal["clock_pulse"]


class CheckinTrace(TypedDict):
trace_id: str


class CheckinContexts(TypedDict):
trace: NotRequired[CheckinTrace]


class CheckinPayload(TypedDict):
check_in_id: str
monitor_slug: str
status: str
environment: NotRequired[str]
duration: NotRequired[int]
monitor_config: NotRequired[Dict]
contexts: NotRequired[CheckinContexts]
Loading
Loading