Skip to content

Commit

Permalink
ref(crons): Simplify monitor clock implementation
Browse files Browse the repository at this point in the history
This is a follow up to GH-54204 as suggested by @fpacifici

#53661 (comment)

> Can we just have the pulse message running in both modes and treat
> everything as high volume mode?

Instead of having two modes, we can simply always use the same logic for
dispatching the monitor tasks on the minute roll-over, using the
consumer as a clock.

Previously the worry here was that in low-volume check-in situations
nothing would drive the clock and we would need to have an external
clock, with a different way to dispatch the tasks. But there is no need
for a different way to dispatch the tasks, we can have an external clock
that pulses messages into the topic and we can simply use the same logic
already implemented to use the topic messages as a clock.

This change removes the concept of "high volume" / "low volume" and adds
the concept of a "clock_pulse" message to the consumer.

In a follow up PR we will introduce the celery beat task which produces
the clock_pulse messages.
  • Loading branch information
evanpurkhiser committed Aug 10, 2023
1 parent 6612cd8 commit a2b55a6
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 41 deletions.
84 changes: 50 additions & 34 deletions src/sentry/monitors/consumers/monitor_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import annotations

import logging
import uuid
from datetime import datetime, timedelta
from typing import Dict, Mapping, Optional, TypedDict
from typing import Dict, Literal, Mapping, Optional, TypedDict

import msgpack
import sentry_sdk
Expand Down Expand Up @@ -51,19 +53,24 @@
CHECKIN_QUOTA_LIMIT = 5
CHECKIN_QUOTA_WINDOW = 60

# This key is used when SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled and we
# trigger the monitor tasks as a side-effect of check-ins coming in. It is used
# to store he last timestamp that the tasks were triggered.
HIGH_VOLUME_LAST_TRIGGER_TS_KEY = "sentry.monitors.last_tasks_ts"
# This key is used to store he last timestamp that the tasks were triggered.
MONITOR_TASKS_LAST_TRIGGERED_KEY = "sentry.monitors.last_tasks_ts"


class CheckinMessage(TypedDict):
# TODO(epurkhiser): We should make this required and ensure the message
# produced by relay includes this message type
message_type: NotRequired[Literal["check_in"]]
payload: str
start_time: str
start_time: float
project_id: str
sdk: str


class ClockPulseMessage(TypedDict):
message_type: Literal["clock_pulse"]


class CheckinTrace(TypedDict):
trace_id: str

Expand Down Expand Up @@ -147,6 +154,23 @@ def _ensure_monitor_with_config(


def _dispatch_tasks(ts: datetime):
"""
Dispatch monitor tasks triggered by the consumer clock. These will run
after the MONITOR_TASK_DELAY (in seconds), This is to give some breathing
room for check-ins to start and not be EXACTLY on the minute
These tasks are triggered via the consumer processing check-ins. This
allows the monitor tasks to be synchronized to any backlog of check-ins
that are being processed.
To ensure these tasks are always triggered there is an additional celery
beat task that produces a clock pulse message into the topic that can be
used to trigger these tasks when there is a low volume of check-ins. It is
however, preferred to have a high volume of check-ins, so we do not need to
rely on celery beat, which in some cases may fail to trigger (such as in
sentry.io, when we deploy we restart the celery beat worker and it will
skip any tasks it missed)
"""
# For now we're going to have this do nothing. We want to validate that
# we're not going to be skipping any check-ins
return
Expand All @@ -155,23 +179,9 @@ def _dispatch_tasks(ts: datetime):
# check_timeout.delay(current_datetime=ts)


def _handle_clock_pulse_task_trigger(ts: datetime):
def _try_monitor_tasks_trigger(ts: datetime):
"""
Handles clock pulse messages. These pulses are generated by the
`sentry.monitors.tasks.clock_pulse` tasks which runs every minute. Clock
pulses will NOT be generated when SENTRY_MONITORS_HIGH_VOLUME_MODE is
enabled.
This function is responsible for dispatching the missed check-in and timed
out check-in detection tasks.
"""
_dispatch_tasks(ts)


def _try_handle_high_volume_task_trigger(ts: datetime):
"""
When SENTRY_MONITORS_HIGH_VOLUME_MODE is enabled we use each check-in
message as a pseudo clock.
Handles triggering the monitor tasks when we've rolled over the minute.
"""
redis_client = redis.redis_clusters.get(settings.SENTRY_MONITORS_REDIS_CLUSTER)

Expand All @@ -180,7 +190,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
reference_datetime = ts.replace(second=0, microsecond=0)
reference_ts = int(reference_datetime.timestamp())

precheck_last_ts = redis_client.get(HIGH_VOLUME_LAST_TRIGGER_TS_KEY)
precheck_last_ts = redis_client.get(MONITOR_TASKS_LAST_TRIGGERED_KEY)
if precheck_last_ts is not None:
precheck_last_ts = int(precheck_last_ts)

Expand All @@ -194,7 +204,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):

# GETSET is atomic. This is critical to avoid another consumer also
# processing the same tick.
last_ts = redis_client.getset(HIGH_VOLUME_LAST_TRIGGER_TS_KEY, reference_ts)
last_ts = redis_client.getset(MONITOR_TASKS_LAST_TRIGGERED_KEY, reference_ts)
if last_ts is not None:
last_ts = int(last_ts)

Expand All @@ -212,7 +222,7 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
"monitors.consumer.clock_tick",
extra={"reference_datetime": str(reference_datetime)},
)
metrics.gauge("monitors.task.high_volume_clock_delay", total_delay, sample_rate=1.0)
metrics.gauge("monitors.task.clock_delay", total_delay, sample_rate=1.0)

# If more than exactly a minute has passed then we've skipped a
# task run, report that to sentry, it is a problem.
Expand All @@ -225,15 +235,21 @@ def _try_handle_high_volume_task_trigger(ts: datetime):
_dispatch_tasks(ts)


def _process_message(ts: datetime, wrapper: CheckinMessage) -> None:
# When running in high volume mode we will not consume clock pulses (The
# clock_pulse task is not enabled). Instead we use each check-in message as
# a means for triggering our tasks.
if settings.SENTRY_MONITORS_HIGH_VOLUME_MODE:
try:
_try_handle_high_volume_task_trigger(ts)
except Exception:
logger.exception("Failed try high-volume task trigger", exc_info=True)
def _process_message(ts: datetime, wrapper: CheckinMessage | ClockPulseMessage) -> None:
# XXX: Relay does not attach a message type, to properly discriminate the
# type we add it by default here. This can be removed once the message_type
# is guaranteed
if "message_type" not in wrapper:
wrapper["message_type"] = "check_in"

try:
_try_monitor_tasks_trigger(ts)
except Exception:
logger.exception("Failed to trigger monitor tasks", exc_info=True)

# Nothing else to do with clock pulses
if wrapper["message_type"] == "clock_pulse":
return

params: CheckinPayload = json.loads(wrapper["payload"])
start_time = to_datetime(float(wrapper["start_time"]))
Expand Down
36 changes: 29 additions & 7 deletions tests/sentry/monitors/test_monitor_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ def send_checkin(
)
)

def send_clock_pulse(
self,
ts: Optional[datetime] = None,
) -> None:
if ts is None:
ts = datetime.now()

wrapper = {"message_type": "clock_pulse"}

commit = mock.Mock()
partition = Partition(Topic("test"), 0)
StoreMonitorCheckInStrategyFactory().create_with_partitions(commit, {partition: 0}).submit(
Message(
BrokerValue(
KafkaPayload(b"fake-key", msgpack.packb(wrapper), []),
partition,
1,
ts,
)
)
)

def test_payload(self) -> None:
monitor = self._create_monitor(slug="my-monitor")

Expand Down Expand Up @@ -548,10 +570,9 @@ def test_organization_killswitch(self):

assert not MonitorCheckIn.objects.filter(guid=self.guid).exists()

@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
@mock.patch("sentry.monitors.consumers.monitor_consumer.CHECKIN_QUOTA_LIMIT", 20)
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
def test_high_volume_task_trigger(self, dispatch_tasks):
def test_monitor_task_trigger(self, dispatch_tasks):
monitor = self._create_monitor(slug="my-monitor")

assert dispatch_tasks.call_count == 0
Expand Down Expand Up @@ -581,17 +602,18 @@ def test_high_volume_task_trigger(self, dispatch_tasks):
assert dispatch_tasks.call_count == 3
capture_message.assert_called_with("Monitor task dispatch minute skipped")

# A clock pulse message also triggers the tasks
self.send_clock_pulse(ts=now + timedelta(minutes=4))
assert dispatch_tasks.call_count == 4

# An exception dispatching the tasks does NOT cause ingestion to fail
with mock.patch("sentry.monitors.consumers.monitor_consumer.logger") as logger:
dispatch_tasks.side_effect = Exception()
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=4))
self.send_checkin(monitor.slug, ts=now + timedelta(minutes=5))
assert MonitorCheckIn.objects.filter(guid=self.guid).exists()
logger.exception.assert_called_with(
"Failed try high-volume task trigger", exc_info=True
)
logger.exception.assert_called_with("Failed to trigger monitor tasks", exc_info=True)
dispatch_tasks.side_effect = None

@override_settings(SENTRY_MONITORS_HIGH_VOLUME_MODE=True)
@mock.patch("sentry.monitors.consumers.monitor_consumer._dispatch_tasks")
def test_monitor_task_trigger_partition_desync(self, dispatch_tasks):
"""
Expand Down

0 comments on commit a2b55a6

Please sign in to comment.