diff --git a/helpers/checkpoint_logger.py b/helpers/checkpoint_logger.py deleted file mode 100644 index b98977264..000000000 --- a/helpers/checkpoint_logger.py +++ /dev/null @@ -1,98 +0,0 @@ -import logging -import time -from enum import Enum, auto - -import sentry_sdk -from shared.metrics import metrics - -logger = logging.getLogger(__name__) - - -class UploadFlow(str, Enum): - UPLOAD_TASK_BEGIN = auto() - PROCESSING_BEGIN = auto() - INITIAL_PROCESSING_COMPLETE = auto() - BATCH_PROCESSING_COMPLETE = auto() - PROCESSING_COMPLETE = auto() - NOTIFIED = auto() - - -def _get_milli_timestamp(): - return time.time_ns() // 1000000 - - -def _kwargs_key(cls): - return f"checkpoints_{cls.__name__}" - - -def from_kwargs(cls, kwargs, strict=False): - data = kwargs.get(_kwargs_key(cls), {}) - - # Make sure these checkpoints were made with the same flow - for key in data.keys(): - if key not in iter(cls): - raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`") - - return CheckpointLogger(cls, data, strict) - - -class CheckpointLogger: - def __init__(self, cls, data=None, strict=False): - self.cls = cls - self.data = data if data else {} - self.kwargs_key = _kwargs_key(self.cls) - self.strict = strict - - def _error(self, msg): - # When a new version of worker rolls out, it will pick up tasks that - # may have been enqueued by the old worker and be missing checkpoints - # data. At least for that reason, we want to allow failing softly. - metrics.incr("worker.checkpoint_logger.error") - if self.strict: - raise ValueError(msg) - else: - logger.warning(msg) - - def _validate_checkpoint(self, checkpoint): - if checkpoint.__class__ != self.cls: - # This error is not ignored when `self.strict==False` because it's definitely - # a code mistake - raise ValueError( - f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`" - ) - - def _subflow_duration(self, start, end): - self._validate_checkpoint(start) - self._validate_checkpoint(end) - if start not in self.data: - return self._error( - f"Cannot compute duration; missing start checkpoint {start}" - ) - elif end not in self.data: - return self._error(f"Cannot compute duration; missing end checkpoint {end}") - elif end.value <= start.value: - # This error is not ignored when `self.strict==False` because it's definitely - # a code mistake - raise ValueError( - f"Cannot compute duration; end {end} is not after start {start}" - ) - - return self.data[end] - self.data[start] - - def log(self, checkpoint, ignore_repeat=False, kwargs=None): - if checkpoint not in self.data: - self._validate_checkpoint(checkpoint) - self.data[checkpoint] = _get_milli_timestamp() - elif not ignore_repeat: - self._error(f"Already recorded checkpoint {checkpoint}") - - if kwargs is not None: - kwargs[self.kwargs_key] = self.data - - return self - - def submit_subflow(self, metric, start, end): - duration = self._subflow_duration(start, end) - sentry_sdk.set_measurement(metric, duration, "milliseconds") - - return self diff --git a/helpers/checkpoint_logger/__init__.py b/helpers/checkpoint_logger/__init__.py new file mode 100644 index 000000000..a02a7c3da --- /dev/null +++ b/helpers/checkpoint_logger/__init__.py @@ -0,0 +1,388 @@ +import functools +import itertools +import logging +import time +from enum import Enum, auto +from typing import ( + Any, + Callable, + ClassVar, + Generic, + Iterable, + Mapping, + MutableMapping, + Optional, + TypeAlias, + TypeVar, +) + +import sentry_sdk +from shared.metrics import metrics + +logger = logging.getLogger(__name__) + + +T = TypeVar("T", bound="BaseFlow") +TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]] + + +class BaseFlow(Enum): + """ + Base class for a flow. Defines optional functions which are added by the + @success_events, @failure_events, @subflows, and @reliability_counters + decorators to (mostly) appease mypy. + """ + + _subflows: Callable[[], TSubflows] + _success_events: Callable[[], Iterable[T]] + _failure_events: Callable[[], Iterable[T]] + is_success: ClassVar[Callable[[T], bool]] + is_failure: ClassVar[Callable[[T], bool]] + log_counters: ClassVar[Callable[[T], None]] + + +TClassDecorator: TypeAlias = Callable[[type[T]], type[T]] + + +def failure_events(*args: str) -> TClassDecorator: + """ + Class decorator that designates some events as terminal failure conditions. + + @failure_events('ERROR') + class MyEnum(str, Enum): + BEGIN: auto() + CHECKPOINT: auto() + ERROR: auto() + FINISHED: auto() + assert MyEnum.ERROR.is_failure() + """ + + def class_decorator(klass: type[T]) -> type[T]: + def _failure_events() -> Iterable[T]: + return {v for k, v in klass.__members__.items() if k in args} + + def is_failure(obj: T) -> bool: + return obj in _failure_events() + + # `_failure_events` is a cached function rather than a data member so + # that it is not processed as if it's a value from the enum. + klass._failure_events = functools.lru_cache(maxsize=1)(_failure_events) + klass.is_failure = is_failure + + return klass + + return class_decorator + + +def success_events(*args: str) -> TClassDecorator: + """ + Class decorator that designates some events as terminal success conditions. + + @success_events('FINISHED') + class MyEnum(str, Enum): + BEGIN: auto() + CHECKPOINT: auto() + ERROR: auto() + FINISHED: auto() + assert MyEnum.FINISHED.is_success() + """ + + def class_decorator(klass: type[T]) -> type[T]: + def _success_events() -> Iterable[T]: + return {v for k, v in klass.__members__.items() if k in args} + + def is_success(obj: T) -> bool: + return obj in _success_events() + + # `_success_events` is a cached function rather than a data member so + # that it is not processed as if it's a value from the enum. + klass._success_events = functools.lru_cache(maxsize=1)(_success_events) + klass.is_success = is_success + + return klass + + return class_decorator + + +def subflows(*args: tuple[str, str, str]) -> TClassDecorator: + """ + Class decorator that defines a set of interesting subflows which should be + logged as well as the name each should be logged with. It is expected that + you invoke this **after** @success_events() and/or @failure_events(). + + @success_events('FINISH') + @subflows( + ('first_subflow', 'BEGIN', 'CHECKPOINT_A'), + ('second_subflow', 'CHECKPOINT_A', 'FINISH') + ) + class MyEnum(str, Enum): + BEGIN: auto() + CHECKPOINT: auto() + ERROR: auto() + FINISHED: auto() + + A subflow from the first event to each terminal event (success and failure) is + created implicitly with names like 'MyEnum_BEGIN_to_FINISHED'. This name can be + overridden by defining the subflow explicitly. + """ + + def class_decorator(klass: type[T]) -> type[T]: + def _subflows() -> TSubflows: + # We get our subflows in the form: [(metric, begin, end)] + # We want them in the form: {end: [(metric, begin)]} + # The first step of munging is to group by end + key_on_end = lambda x: x[2] + sorted_by_end = sorted(args, key=key_on_end) + grouped_by_end = itertools.groupby(args, key=key_on_end) + + enum_vals = klass.__members__ + + subflows = {} + for end, group in grouped_by_end: + # grouped_by_end is not a simple dict so we create our own. + # `begin` and `end` are still strings at this point so we also want to convert + # them to enum values. + subflows[enum_vals[end]] = list( + ((metric, enum_vals[begin]) for metric, begin, _ in group) + ) + + # The first enum value is the beginning of the flow, no matter what + # branches it takes. We want to automatically define a subflow from + # this beginning to each terminal checkpoint (failures/successes) + # unless the user provided one already. + flow_begin = next(iter(enum_vals.values())) + + # `klass._failure_events` comes from the `@failure_events` decorator + if hasattr(klass, "_failure_events"): + # mypy thinks klass._failure_events == klass + for end in klass._failure_events(): # type: ignore[operator] + flows_ending_here = subflows.setdefault( + end, [] + ) # [(metric, begin)] + if not any((x[1] == flow_begin for x in flows_ending_here)): + flows_ending_here.append( + ( + f"{klass.__name__}_{flow_begin.name}_to_{end.name}", + flow_begin, + ) + ) + + # `klass._success_events` comes from the `@success_events` decorator + if hasattr(klass, "_success_events"): + # mypy thinks klass._success_events == klass + for end in klass._success_events(): # type: ignore[operator] + flows_ending_here = subflows.setdefault( + end, [] + ) # [(metric, begin)] + if not any((x[1] == flow_begin for x in flows_ending_here)): + flows_ending_here.append( + ( + f"{klass.__name__}_{flow_begin.name}_to_{end.name}", + flow_begin, + ) + ) + + return subflows + + klass._subflows = functools.lru_cache(maxsize=1)(_subflows) + return klass + + return class_decorator + + +def reliability_counters(klass: type[T]) -> type[T]: + """ + Class decorator that enables computing success/failure rates for a flow. It + is expected that you invoke this **after** @success_events and/or + @failure_events. + + @success_events('FINISHED') + @failure_events('ERROR') + @reliability_counters + class MyEnum(str, Enum): + BEGIN: auto() + CHECKPOINT: auto() + ERROR: auto() + FINISHED: auto() + MyEnum.BEGIN.log_counters() # increments "MyEnum.begun" counter + MyEnum.ERROR.log_counters() # increments "MyEnum.failed" counter + MyEnum.FINISHED.log_counters() # increments "MyEnum.succeeded" counter + + A "MyEnum.ended" counter is incremented for both success and failure events. + This counter can be compared to "MyEnum.begun" to detect if any branches + aren't instrumented. + """ + + def log_counters(obj: T) -> None: + metrics.incr(f"{klass.__name__}.events.{obj.name}") + + # If this is the first checkpoint, increment the number of flows we've begun + if obj == next(iter(klass.__members__.values())): + metrics.incr(f"{klass.__name__}.total.begun") + return + + is_failure = hasattr(obj, "is_failure") and obj.is_failure() + is_success = hasattr(obj, "is_success") and obj.is_success() + is_terminal = is_failure or is_success + + if is_failure: + metrics.incr(f"{klass.__name__}.total.failed") + elif is_success: + metrics.incr(f"{klass.__name__}.total.succeeded") + + if is_terminal: + metrics.incr(f"{klass.__name__}.total.ended") + + klass.log_counters = log_counters + return klass + + +def _get_milli_timestamp() -> int: + return time.time_ns() // 1000000 + + +def _kwargs_key(cls: type[T]) -> str: + return f"checkpoints_{cls.__name__}" + + +class CheckpointLogger(Generic[T]): + """ + CheckpointLogger is a class that tracks latencies/reliabilities for higher-level + "flows" that don't map well to auto-instrumented tracing. It can be + reconstructed from its serialized data allowing you to begin a flow on one host + and log its completion on another (as long as clock drift is marginal). + + See `UploadFlow` for an example of defining a flow. It's recomended that you + define your flow with the decorators in this file: + - `@success_events()`, `@failure_events()`: designate some events as terminal + success/fail states of your flow. + - `@subflows()`: pre-define subflows that get submitted automatically; implicitly + define a flow from the first event to each success or failure event + - `@reliability_counters`: increment event, start, finish, success, failure counters + for defining reliability metrics for your flow + + It is expected that `@subflows()` and `@reliability_counters` be invoked **after** + `@success_events()` and/or `@failure_events`. + + # Simple usage + checkpoints = CheckpointLogger(UploadFlow) + checkpoints.log(UploadFlow.BEGIN) + ... + # each member returns `self` so you can chain `log` and `submit_subflow` calls + # calling `submit_subflow` manually is unnecessary when using `@subflows()` + checkpoints + .log(UploadFlow.PROCESSING_BEGIN) + .submit_subflow("time_before_processing", UploadFlow.BEGIN, UploadFlow.PROCESSING_BEGIN) + + # More complicated usage + # - Creates logger from `kwargs` + # - logs `UploadFlow.BEGIN` directly into `kwargs` + # - ignores if `UploadFlow.BEGIN` was already logged (i.e. if this is a task retry) + from_kwargs(UploadFlow, kwargs).log(UploadFlow.BEGIN, kwargs=kwargs, ignore_repeat=True) + next_task(kwargs) + ... + # when using `@failure_events()` and `@subflows()`, an auto-created subflow + # is automatically submitted because `UploadFlow.TOO_MANY_RETRIES` is an error + from_kwargs(UploadFlow, kwargs) + .log(UploadFlow.TOO_MANY_RETRIES) + """ + + _Self = TypeVar("_Self", bound="CheckpointLogger[T]") + + def __init__( + self: _Self, + cls: type[T], + data: Optional[MutableMapping[T, int]] = None, + strict=False, + ): + self.cls = cls + self.data = data if data else {} + self.kwargs_key = _kwargs_key(self.cls) + self.strict = strict + + def _error(self: _Self, msg: str) -> None: + # When a new version of worker rolls out, it will pick up tasks that + # may have been enqueued by the old worker and be missing checkpoints + # data. At least for that reason, we want to allow failing softly. + metrics.incr("worker.checkpoint_logger.error") + if self.strict: + raise ValueError(msg) + else: + logger.warning(msg) + + def _validate_checkpoint(self: _Self, checkpoint: T) -> None: + if checkpoint.__class__ != self.cls: + # This error is not ignored when `self.strict==False` because it's definitely + # a code mistake + raise ValueError( + f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`" + ) + + def _subflow_duration(self: _Self, start: T, end: T) -> Optional[int]: + self._validate_checkpoint(start) + self._validate_checkpoint(end) + if start not in self.data: + self._error(f"Cannot compute duration; missing start checkpoint {start}") + return None + elif end not in self.data: + self._error(f"Cannot compute duration; missing end checkpoint {end}") + return None + elif end.value <= start.value: + # This error is not ignored when `self.strict==False` because it's definitely + # a code mistake + raise ValueError( + f"Cannot compute duration; end {end} is not after start {start}" + ) + + return self.data[end] - self.data[start] + + def log( + self: _Self, + checkpoint: T, + ignore_repeat: bool = False, + kwargs: Optional[MutableMapping[str, Any]] = None, + ) -> _Self: + if checkpoint not in self.data: + self._validate_checkpoint(checkpoint) + self.data[checkpoint] = _get_milli_timestamp() + elif not ignore_repeat: + self._error(f"Already recorded checkpoint {checkpoint}") + + if kwargs is not None: + kwargs[self.kwargs_key] = self.data + + # `self.cls._subflows()` comes from the `@subflows` decorator + # If the flow has pre-defined subflows, we can automatically submit + # any of them that end with the checkpoint we just logged. + if hasattr(self.cls, "_subflows"): + # mypy thinks selc.cls._subflows == self.cls + for metric, beginning in self.cls._subflows().get(checkpoint, []): # type: ignore[operator] + self.submit_subflow(metric, beginning, checkpoint) + + # `checkpoint.log_counters()` comes from the `@reliability_counters` + # decorator + # Increment event, start, finish, success, failure counters + if hasattr(checkpoint, "log_counters"): + checkpoint.log_counters() + + return self + + def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self: + duration = self._subflow_duration(start, end) + if duration: + sentry_sdk.set_measurement(metric, duration, "milliseconds") + + return self + + +def from_kwargs( + cls: type[T], kwargs: MutableMapping[str, Any], strict: bool = False +) -> CheckpointLogger[T]: + data = kwargs.get(_kwargs_key(cls), {}) + + # Make sure these checkpoints were made with the same flow + for key in data.keys(): + if key not in iter(cls): + raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`") + + return CheckpointLogger(cls, data, strict) diff --git a/helpers/checkpoint_logger/flows.py b/helpers/checkpoint_logger/flows.py new file mode 100644 index 000000000..756b00a57 --- /dev/null +++ b/helpers/checkpoint_logger/flows.py @@ -0,0 +1,52 @@ +from enum import auto + +from helpers.checkpoint_logger import ( + BaseFlow, + failure_events, + reliability_counters, + subflows, + success_events, +) + + +@failure_events( + "TOO_MANY_RETRIES", + "NOTIF_LOCK_ERROR", + "NOTIF_NO_VALID_INTEGRATION", + "NOTIF_GIT_CLIENT_ERROR", + "NOTIF_GIT_SERVICE_ERROR", + "NOTIF_TOO_MANY_RETRIES", + "NOTIF_ERROR_NO_REPORT", +) +@success_events( + "SKIPPING_NOTIFICATION", "NOTIFIED", "NO_PENDING_JOBS", "NOTIF_STALE_HEAD" +) +@subflows( + ("time_before_processing", "UPLOAD_TASK_BEGIN", "PROCESSING_BEGIN"), + ("initial_processing_duration", "PROCESSING_BEGIN", "INITIAL_PROCESSING_COMPLETE"), + ( + "batch_processing_duration", + "INITIAL_PROCESSING_COMPLETE", + "BATCH_PROCESSING_COMPLETE", + ), + ("total_processing_duration", "PROCESSING_BEGIN", "PROCESSING_COMPLETE"), + ("notification_latency", "UPLOAD_TASK_BEGIN", "NOTIFIED"), +) +@reliability_counters +class UploadFlow(BaseFlow): + UPLOAD_TASK_BEGIN = auto() + NO_PENDING_JOBS = auto() + TOO_MANY_RETRIES = auto() + PROCESSING_BEGIN = auto() + INITIAL_PROCESSING_COMPLETE = auto() + BATCH_PROCESSING_COMPLETE = auto() + PROCESSING_COMPLETE = auto() + SKIPPING_NOTIFICATION = auto() + NOTIFIED = auto() + NOTIF_LOCK_ERROR = auto() + NOTIF_NO_VALID_INTEGRATION = auto() + NOTIF_GIT_CLIENT_ERROR = auto() + NOTIF_GIT_SERVICE_ERROR = auto() + NOTIF_TOO_MANY_RETRIES = auto() + NOTIF_STALE_HEAD = auto() + NOTIF_ERROR_NO_REPORT = auto() diff --git a/helpers/tests/unit/test_checkpoint_logger.py b/helpers/tests/unit/test_checkpoint_logger.py index 1e8a1becf..3b6e4ec9a 100644 --- a/helpers/tests/unit/test_checkpoint_logger.py +++ b/helpers/tests/unit/test_checkpoint_logger.py @@ -4,27 +4,56 @@ import pytest import sentry_sdk +from shared.utils.test_utils import mock_metrics from helpers.checkpoint_logger import ( + BaseFlow, CheckpointLogger, _get_milli_timestamp, + failure_events, from_kwargs, + reliability_counters, + subflows, + success_events, ) -class TestEnum1(Enum): +@failure_events("BRANCH_1_FAIL") +@success_events("BRANCH_1_SUCCESS", "BRANCH_2_SUCCESS") +@subflows( + ("first_checkpoint", "BEGIN", "CHECKPOINT"), + ("branch_1_to_finish", "BRANCH_1", "BRANCH_1_SUCCESS"), + ("total_branch_1_time", "BEGIN", "BRANCH_1_SUCCESS"), + ("total_branch_1_fail_time", "BEGIN", "BRANCH_1_FAIL"), +) +@reliability_counters +class DecoratedEnum(BaseFlow): + BEGIN = auto() + CHECKPOINT = auto() + BRANCH_1 = auto() + BRANCH_1_FAIL = auto() + BRANCH_1_SUCCESS = auto() + BRANCH_2 = auto() + BRANCH_2_SUCCESS = auto() + + +class TestEnum1(BaseFlow): A = auto() B = auto() C = auto() -class TestEnum2(Enum): +class TestEnum2(BaseFlow): A = auto() B = auto() C = auto() class TestCheckpointLogger(unittest.TestCase): + @pytest.fixture(scope="function", autouse=True) + def inject_mocker(request, mocker): + request.mocker = mocker + @patch("time.time_ns", return_value=123456789) def test_get_milli_timestamp(self, mocker): expected_ms = 123456789 // 1000000 @@ -51,18 +80,18 @@ def test_log_multiple_checkpoints(self, mocker): self.assertEqual(checkpoints.data[TestEnum1.B], 9001) self.assertEqual(checkpoints.data[TestEnum1.C], 100000) - def test_log_checkpoint_twice_ahrows(self): + def test_log_checkpoint_twice_throws(self): checkpoints = CheckpointLogger(TestEnum1, strict=True) checkpoints.log(TestEnum1.A) with self.assertRaises(ValueError): checkpoints.log(TestEnum1.A) - def test_log_checkpoint_wrong_enum_throws(self): + def test_log_checkpoint_wrong_enum_throws(self) -> None: checkpoints = CheckpointLogger(TestEnum1, strict=True) with self.assertRaises(ValueError): - checkpoints.log(TestEnum2.A) + checkpoints.log(TestEnum2.A) # type: ignore[arg-type] @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) def test_subflow_duration(self, mocker): @@ -173,8 +202,6 @@ def test_log_to_kwargs(self, mock_timestamp): assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337 assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001 - pass - @pytest.mark.real_checkpoint_logger @patch("sentry_sdk.set_measurement") @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[9001]) @@ -194,3 +221,124 @@ def test_create_log_oneliner(self, mock_timestamp, mock_sentry): mock_sentry.assert_called_with("x", expected_duration, "milliseconds") assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337 assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001 + + def test_success_failure_decorators(self): + for val in DecoratedEnum.__members__.values(): + if val in [DecoratedEnum.BRANCH_1_SUCCESS, DecoratedEnum.BRANCH_2_SUCCESS]: + assert val.is_success() + else: + assert not val.is_success() + + if val in [DecoratedEnum.BRANCH_1_FAIL]: + assert val.is_failure() + else: + assert not val.is_failure() + + def test_subflows_decorator(self): + subflows = DecoratedEnum._subflows() + + # No subflows end at these checkpoints + assert DecoratedEnum.BEGIN not in subflows + assert DecoratedEnum.BRANCH_1 not in subflows + assert DecoratedEnum.BRANCH_2 not in subflows + + # `DecoratedEnum.CHECKPOINT` is not a terminal event, but we explicitly + # defined a subflow ending there. + checkpoint_subflows = subflows.get(DecoratedEnum.CHECKPOINT) + assert checkpoint_subflows is not None + assert len(checkpoint_subflows) == 1 + assert checkpoint_subflows[0] == ("first_checkpoint", DecoratedEnum.BEGIN) + + # All terminal events should have a subflow defined for them which + # begins at the flow's first event. `BRANCH_1_FAIL` has had this + # subflow provided by the user, so we should use the user's name. + branch_1_fail_subflows = subflows.get(DecoratedEnum.BRANCH_1_FAIL) + assert branch_1_fail_subflows is not None + assert len(branch_1_fail_subflows) == 1 + assert branch_1_fail_subflows[0] == ( + "total_branch_1_fail_time", + DecoratedEnum.BEGIN, + ) + + # All terminal events should have a subflow defined for them which + # begins at the flow's first event. `BRANCH_1_SUCCESS` has had this + # subflow provided by the user, so we should use the user's name. + # Also, `BRANCH_1_SUCCESS` is the end of a second subflow. Ensure that + # both subflows are present. + branch_1_success_subflows = subflows.get(DecoratedEnum.BRANCH_1_SUCCESS) + assert branch_1_success_subflows is not None + assert len(branch_1_success_subflows) == 2 + assert ("total_branch_1_time", DecoratedEnum.BEGIN) in branch_1_success_subflows + assert ( + "branch_1_to_finish", + DecoratedEnum.BRANCH_1, + ) in branch_1_success_subflows + + # All terminal events should have a subflow defined for them which + # begins at the flow's first event. `BRANCH_2_SUCCESS` has not had this + # subflow provided by the user, so we should use the default name. + branch_2_success_subflows = subflows.get(DecoratedEnum.BRANCH_2_SUCCESS) + assert branch_2_success_subflows is not None + assert len(branch_2_success_subflows) == 1 + assert branch_2_success_subflows[0] == ( + "DecoratedEnum_BEGIN_to_BRANCH_2_SUCCESS", + DecoratedEnum.BEGIN, + ) + + @pytest.mark.real_checkpoint_logger + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + @patch("sentry_sdk.set_measurement") + def test_subflow_autosubmit(self, mock_sentry, mock_timestamp): + checkpoints = CheckpointLogger(DecoratedEnum) + checkpoints.log(DecoratedEnum.BEGIN) + checkpoints.log(DecoratedEnum.CHECKPOINT) + + expected_duration = 9001 - 1337 + mock_sentry.assert_called_with( + "first_checkpoint", expected_duration, "milliseconds" + ) + + def test_reliability_counters(self): + metrics = mock_metrics(self.mocker) + assert not metrics.data + + checkpoints = CheckpointLogger(DecoratedEnum) + + checkpoints.log(DecoratedEnum.BEGIN) + assert metrics.data["DecoratedEnum.events.BEGIN"] == 1 + assert metrics.data["DecoratedEnum.total.begun"] == 1 + assert "DecoratedEnum.total.succeeded" not in metrics.data + assert "DecoratedEnum.total.failed" not in metrics.data + assert "DecoratedEnum.total.ended" not in metrics.data + + # Nothing special about `CHECKPOINT` - no counters should change + checkpoints.log(DecoratedEnum.CHECKPOINT) + assert metrics.data["DecoratedEnum.events.CHECKPOINT"] == 1 + assert metrics.data["DecoratedEnum.total.begun"] == 1 + assert "DecoratedEnum.total.succeeded" not in metrics.data + assert "DecoratedEnum.total.failed" not in metrics.data + assert "DecoratedEnum.total.ended" not in metrics.data + + # Failures should increment both `failed` and `ended` + checkpoints.log(DecoratedEnum.BRANCH_1_FAIL) + assert metrics.data["DecoratedEnum.events.BRANCH_1_FAIL"] == 1 + assert metrics.data["DecoratedEnum.total.begun"] == 1 + assert metrics.data["DecoratedEnum.total.failed"] == 1 + assert metrics.data["DecoratedEnum.total.ended"] == 1 + assert "DecoratedEnum.total.succeeded" not in metrics.data + + # Successes should increment both `succeeded` and `ended` + checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS) + assert metrics.data["DecoratedEnum.events.BRANCH_1_SUCCESS"] == 1 + assert metrics.data["DecoratedEnum.total.begun"] == 1 + assert metrics.data["DecoratedEnum.total.failed"] == 1 + assert metrics.data["DecoratedEnum.total.ended"] == 2 + assert metrics.data["DecoratedEnum.total.succeeded"] == 1 + + # A different success path should also increment `succeeded` and `ended` + checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS) + assert metrics.data["DecoratedEnum.events.BRANCH_2_SUCCESS"] == 1 + assert metrics.data["DecoratedEnum.total.begun"] == 1 + assert metrics.data["DecoratedEnum.total.failed"] == 1 + assert metrics.data["DecoratedEnum.total.ended"] == 3 + assert metrics.data["DecoratedEnum.total.succeeded"] == 2 diff --git a/tasks/notify.py b/tasks/notify.py index 319ff44d9..34faa43ed 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -15,8 +15,8 @@ from app import celery_app from database.enums import CommitErrorTypes, Decoration from database.models import Commit, Pull -from helpers.checkpoint_logger import UploadFlow from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.activation import activate_user @@ -61,6 +61,7 @@ async def run_async( "Not notifying because there are seemingly other jobs being processed yet", extra=dict(repoid=repoid, commitid=commitid), ) + # Should we log an UploadFlow checkpoint here? return { "notified": False, "notifications": None, @@ -93,6 +94,7 @@ async def run_async( lock_acquired=lock_acquired, ), ), + checkpoints_from_kwargs(UploadFlow, kwargs).log(UploadFlow.NOTIF_LOCK_ERROR) return { "notified": False, "notifications": None, @@ -125,6 +127,9 @@ async def run_async_within_lock( "Unable to start notifications because repo doesn't have a valid bot", extra=dict(repoid=repoid, commit=commitid), ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_NO_VALID_INTEGRATION + ) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} if current_yaml is None: current_yaml = await get_current_yaml(commit, repository_service) @@ -140,6 +145,9 @@ async def run_async_within_lock( "Unable to fetch CI results due to a client problem. Not notifying user", extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_GIT_CLIENT_ERROR + ) return { "notified": False, "notifications": None, @@ -150,6 +158,9 @@ async def run_async_within_lock( "Unable to fetch CI results due to server issues. Not notifying user", extra=dict(repoid=commit.repoid, commit=commit.commitid), ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_GIT_SERVICE_ERROR + ) return { "notified": False, "notifications": None, @@ -180,6 +191,9 @@ async def run_async_within_lock( current_yaml=current_yaml.to_dict(), ), ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_TOO_MANY_RETRIES + ) return { "notified": False, "notifications": None, @@ -217,6 +231,9 @@ async def run_async_within_lock( pull_head=enriched_pull.provider_pull["head"]["commitid"], ), ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_STALE_HEAD + ) return { "notified": False, "notifications": None, @@ -233,6 +250,9 @@ async def run_async_within_lock( commit, report_class=ReadOnlyReport ) if head_report is None and empty_upload is None: + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIF_ERROR_NO_REPORT + ) return { "notified": False, "notifications": None, @@ -255,13 +275,7 @@ async def run_async_within_lock( enriched_pull, empty_upload, ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIFIED - ).submit_subflow( - "notification_latency", - UploadFlow.UPLOAD_TASK_BEGIN, - UploadFlow.NOTIFIED, - ) + checkpoints_from_kwargs(UploadFlow, kwargs).log(UploadFlow.NOTIFIED) log.info( "Notifications done", extra=dict( diff --git a/tasks/tests/unit/test_notify_task.py b/tasks/tests/unit/test_notify_task.py index b879c95cf..b7ae3089f 100644 --- a/tasks/tests/unit/test_notify_task.py +++ b/tasks/tests/unit/test_notify_task.py @@ -18,7 +18,8 @@ PullFactory, RepositoryFactory, ) -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key +from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key +from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from services.decoration import DecorationDetails from services.notification import NotificationService diff --git a/tasks/tests/unit/test_upload_finisher_task.py b/tasks/tests/unit/test_upload_finisher_task.py index a60c3f0f0..03e1c417e 100644 --- a/tasks/tests/unit/test_upload_finisher_task.py +++ b/tasks/tests/unit/test_upload_finisher_task.py @@ -12,7 +12,8 @@ RepositoryFactory, UploadFactory, ) -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key +from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key +from helpers.checkpoint_logger.flows import UploadFlow from tasks.upload_finisher import ReportService, UploadFinisherTask here = Path(__file__) diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 789a8604d..15ba886fc 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -16,7 +16,8 @@ from database.models.reports import CommitReport from database.tests.factories import CommitFactory, OwnerFactory, RepositoryFactory from database.tests.factories.core import ReportFactory -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key +from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key +from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from services.archive import ArchiveService from services.report import NotReadyToBuildReportYetError, ReportService diff --git a/tasks/upload.py b/tasks/upload.py index 2787524c1..3c1778752 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -20,8 +20,9 @@ from app import celery_app from database.enums import CommitErrorTypes from database.models import Commit -from helpers.checkpoint_logger import UploadFlow, _kwargs_key +from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.archive import ArchiveService @@ -151,7 +152,7 @@ async def run_async( ): # If we're a retry, kwargs will already have our first checkpoint. # If not, log it directly into kwargs so we can pass it onto other tasks - checkpoints_from_kwargs(UploadFlow, kwargs).log( + checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log( UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True ) @@ -204,6 +205,7 @@ async def run_async( "Not retrying since there are likely no jobs that need scheduling", extra=dict(commit=commitid, repoid=repoid), ) + checkpoints.log(UploadFlow.NO_PENDING_JOBS) return { "was_setup": False, "was_updated": False, @@ -214,6 +216,7 @@ async def run_async( "Not retrying since we already had too many retries", extra=dict(commit=commitid, repoid=repoid), ) + checkpoints.log(UploadFlow.TOO_MANY_RETRIES) return { "was_setup": False, "was_updated": False, @@ -280,11 +283,7 @@ async def run_async_within_lock( try: checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) - checkpoints.log(UploadFlow.PROCESSING_BEGIN).submit_subflow( - "time_before_processing", - UploadFlow.UPLOAD_TASK_BEGIN, - UploadFlow.PROCESSING_BEGIN, - ) + checkpoints.log(UploadFlow.PROCESSING_BEGIN) except ValueError as e: log.warning(f"CheckpointLogger failed to log/submit", extra=dict(error=e)) @@ -378,11 +377,6 @@ async def run_async_within_lock( ) else: checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) - checkpoints.submit_subflow( - "initial_processing_duration", - UploadFlow.PROCESSING_BEGIN, - UploadFlow.INITIAL_PROCESSING_COMPLETE, - ) log.info( "Not scheduling task because there were no arguments were found on redis", extra=dict( @@ -461,11 +455,6 @@ def schedule_task( checkpoint_data = None if checkpoints: checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) - checkpoints.submit_subflow( - "initial_processing_duration", - UploadFlow.PROCESSING_BEGIN, - UploadFlow.INITIAL_PROCESSING_COMPLETE, - ) checkpoint_data = checkpoints.data finish_sig = upload_finisher_task.signature( diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 6b33a6eb3..a4521996f 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -13,8 +13,9 @@ from app import celery_app from database.models import Commit, Pull -from helpers.checkpoint_logger import UploadFlow, _kwargs_key +from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import UploadFlow from services.comparison import get_or_create_comparison from services.redis import get_redis_connection from services.report import ReportService @@ -57,11 +58,7 @@ async def run_async( ): try: checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) - checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE).submit_subflow( - "batch_processing_duration", - UploadFlow.INITIAL_PROCESSING_COMPLETE, - UploadFlow.BATCH_PROCESSING_COMPLETE, - ) + checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE) except ValueError as e: log.warning(f"CheckpointLogger failed to log/submit", extra=dict(error=e)) @@ -183,11 +180,10 @@ async def finish_reports_processing( commit.state = "skipped" if checkpoints: - checkpoints.log(UploadFlow.PROCESSING_COMPLETE).submit_subflow( - "total_processing_duration", - UploadFlow.PROCESSING_BEGIN, - UploadFlow.PROCESSING_COMPLETE, - ) + checkpoints.log(UploadFlow.PROCESSING_COMPLETE) + if not notifications_called: + checkpoints.log(UploadFlow.SKIPPING_NOTIFICATION) + return {"notifications_called": notifications_called} def should_call_notifications(