Skip to content

Commit

Permalink
feat(crons): Refactor monitor tasks to be triggered via a kafka clock
Browse files Browse the repository at this point in the history
Fixes GH-53661
  • Loading branch information
evanpurkhiser committed Aug 4, 2023
1 parent e73e9d7 commit b851653
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 7 deletions.
31 changes: 30 additions & 1 deletion src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def env(
SENTRY_ARTIFACT_BUNDLES_INDEXING_REDIS_CLUSTER = "default"
SENTRY_INTEGRATION_ERROR_LOG_REDIS_CLUSTER = "default"
SENTRY_DEBUG_FILES_REDIS_CLUSTER = "default"
SENTRY_MONITORS_REDIS_CLUSTER = "default"

# Hosts that are allowed to use system token authentication.
# http://en.wikipedia.org/wiki/Reserved_IP_addresses
Expand Down Expand Up @@ -3642,6 +3643,34 @@ def build_cdc_postgres_init_db_volume(settings: Any) -> dict[str, dict[str, str]
# Feature flag to turn off role-swapping to help bridge getsentry transition.
USE_ROLE_SWAPPING_IN_TESTS = True


SENTRY_METRICS_INTERFACE_BACKEND = "sentry.sentry_metrics.client.snuba.SnubaMetricsBackend"
SENTRY_METRICS_INTERFACE_BACKEND_OPTIONS: dict[str, Any] = {}

# This setting configures how the Monitors (Crons) feature will run the tasks
# responsible for marking monitors as having "Missed" check-ins and having
# "Timed out" check-ins.
#
# These two tasks must be run every minute and should be run as close to the
# leading minute boundary as possible. By default these tasks will be
# triggered via a clock pulse that is generated by a celery beat task. The
# sentry.monitors.consumer service is responsible for detecting this clock
# pulse and dispatching the tasks.
#
# When high volume mode is enabled, a clock pulse will not be generated by
# celery beat, instead the monitor consumer will use all processed check-in
# messages as its clock. We track message timestamps (floored to the minute)
# and any time that timestamp changes over a minute, the tasks will be
# triggered
#
# NOTE: THERE MUST BE A HIGH VOLUME OF CHECK-INS TO USE THIS MODE!! If a
# check-in message is not consumed the tasks will not run, and missed
# check-ins will not be generated!
#
# The advantage of high volume mode is that we will not rely on celery beat to
# accurately trigger clock pulses. This is important in scenarios where it is
# not possible to guarantee that the celery beat tasks will run every minute.
#
# (For example, when sentry.io deploys, there is a short period where the
# celery tasks are being restarted, if they are not running during the minute
# boundary, the task will not run)
SENTRY_MONITORS_HIGH_VOLUME_MODE = False
82 changes: 76 additions & 6 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from __future__ import annotations

import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, Mapping, Optional, TypedDict
from typing import Dict, Literal, Mapping, Optional, TypedDict

import msgpack
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.processing.strategies.run_task import RunTask
from arroyo.types import Commit, Message, Partition
from arroyo.types import BrokerValue, Commit, Message, Partition
from django.conf import settings
from django.db import router, transaction
from django.utils.text import slugify
Expand All @@ -29,6 +31,7 @@
MonitorLimitsExceeded,
MonitorType,
)
from sentry.monitors.tasks import check_missing, check_timeout
from sentry.monitors.utils import (
get_new_timeout_at,
get_timeout_at,
Expand All @@ -37,7 +40,7 @@
valid_duration,
)
from sentry.monitors.validators import ConfigValidator, MonitorCheckInValidator
from sentry.utils import json, metrics
from sentry.utils import json, metrics, redis
from sentry.utils.dates import to_datetime
from sentry.utils.locking import UnableToAcquireLock
from sentry.utils.locking.manager import LockManager
Expand All @@ -52,12 +55,17 @@


class CheckinMessage(TypedDict):
message_type: NotRequired[Literal["check_in"]]
payload: str
start_time: str
start_time: float
project_id: str
sdk: str


class ClockPulseMessage(TypedDict):
message_type: Literal["clock_pulse"]


class CheckinTrace(TypedDict):
trace_id: str

Expand Down Expand Up @@ -140,7 +148,68 @@ def _ensure_monitor_with_config(
return monitor


def _process_message(wrapper: CheckinMessage) -> None:
def _dispatch_tasks(ts: datetime):
check_missing.delay(current_datetime=ts)
check_timeout.delay(current_datetime=ts)


def _handle_clock_pulse_task_trigger(ts: datetime):
"""
Handles clock pulse messages. These pulses are generated by the
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
enabled.
This function is responsible for dispatching the missed check-in and timed
out check-in detection tasks.
"""
_dispatch_tasks(ts)


def _try_handle_high_volume_task_trigger(ts: datetime):
"""
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
message as a pseudo clock.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

# Trim the timestamp seconds off, these tasks are run once per minute and
# should have their timestamp clamped to the minute.
current_datetime = ts.replace(second=0, microsecond=0)
current_ts = int(current_datetime.timestamp())

last_ts = redis_client.get("sentry.monitors.last_tasks_ts")

# Do nothing until the message we process moves across the minute boundary
if last_ts == current_ts:
return

try:
lock = locks.get("sentry.monitors.task_trigger", duration=5)
with lock.acquire():
_dispatch_tasks(ts)
redis_client.set("sentry.monitors.last_tasks_ts", current_ts)
except UnableToAcquireLock:
# Another message processor is handling this. Nothing to do
pass


def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
# XXX: Relay does not attach a message type, to properly discriminate the
# type we add it by default here.
if "message_type" not in wrapper:
wrapper["message_type"] = "check_in"

if wrapper["message_type"] == "clock_pulse":
_handle_clock_pulse_task_trigger(ts)
return

# When running in high volume mode we will not consume clock pulses (The
# clock_pulse task is not enabled). Instead we use each check-in message as
# a means for trigering our tasks.
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
_try_handle_high_volume_task_trigger(ts)

params: CheckinPayload = json.loads(wrapper["payload"])
start_time = to_datetime(float(wrapper["start_time"]))
project_id = int(wrapper["project_id"])
Expand Down Expand Up @@ -427,9 +496,10 @@ def create_with_partitions(
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
def process_message(message: Message[KafkaPayload]) -> None:
assert isinstance(message.value, BrokerValue)
try:
wrapper = msgpack.unpackb(message.payload.value)
_process_message(wrapper)
_process_message(message.value.timestamp, wrapper)
except Exception:
logger.exception("Failed to process message payload")

Expand Down
38 changes: 38 additions & 0 deletions src/sentry/monitors/tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import logging

from arroyo import Topic
from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
from django.conf import settings
from django.utils import timezone

from sentry.constants import ObjectStatus
from sentry.tasks.base import instrumented_task
from sentry.utils import metrics
from sentry.utils.arroyo_producer import SingletonProducer
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition

from .models import (
CheckInStatus,
Expand Down Expand Up @@ -40,6 +45,39 @@
SUBTITLE_DATETIME_FORMAT = "%b %d, %I:%M %p"


def _get_monitor_checkin_producer() -> KafkaProducer:
cluster_name = get_topic_definition(settings.KAFKA_INGEST_MONITORS)["cluster"]
producer_config = get_kafka_producer_cluster_options(cluster_name)
producer_config.pop("compression.type", None)
producer_config.pop("message.max.bytes", None)
return KafkaProducer(build_kafka_configuration(default_config=producer_config))


_checkin_producer = SingletonProducer(_get_monitor_checkin_producer)


@instrumented_task(name="sentry.monitors.tasks.produce_clock_pulse")
def produce_clock_pulse(current_datetime=None):
"""
This task is run once a minute when
settings.SENTRY_MONITORS_HIGH_VOLUME_MODE is disabled. It produces a clock
pulse to the check-ins topic
"""
if current_datetime is None:
current_datetime = timezone.now()

if settings.SENTRY_EVENTSTREAM != "sentry.eventstream.kafka.KafkaEventStream":
# If we're not running Kafka then we're just in dev. Directly dispatch the
# write to the issue platform directly
check_missing.delay(current_datetime)
check_timeout.delay(current_datetime)
return

# Produce the pulse into the topic
payload = KafkaPayload(None, b"", [])
_checkin_producer.produce(Topic(settings.KAFKA_INGEST_MONITORS), payload)


@instrumented_task(name="sentry.monitors.tasks.check_missing", time_limit=15, soft_time_limit=10)
def check_missing(current_datetime=None):
if current_datetime is None:
Expand Down

0 comments on commit b851653

Please sign in to comment.