From a48274cc6dbce92f9770b1de8e9703a3f230d287 Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Tue, 24 Oct 2023 20:51:58 -0700 Subject: [PATCH 1/2] feat: add prometheus instrumentation to checkpointlogger --- helpers/checkpoint_logger/__init__.py | 51 +++++- helpers/tests/unit/test_checkpoint_logger.py | 159 ++++++++++++++++++- 2 files changed, 204 insertions(+), 6 deletions(-) diff --git a/helpers/checkpoint_logger/__init__.py b/helpers/checkpoint_logger/__init__.py index 143d78593..d56a8e20c 100644 --- a/helpers/checkpoint_logger/__init__.py +++ b/helpers/checkpoint_logger/__init__.py @@ -17,7 +17,7 @@ ) import sentry_sdk -from shared.metrics import metrics +from shared.metrics import Counter, Histogram, metrics logger = logging.getLogger(__name__) @@ -26,6 +26,45 @@ TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]] +CHECKPOINTS_TOTAL_BEGUN = Counter( + "worker_checkpoints_begun", + "Total number of times a flow's first checkpoint was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_SUCCEEDED = Counter( + "worker_checkpoints_succeeded", + "Total number of times one of a flow's success checkpoints was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_FAILED = Counter( + "worker_checkpoints_failed", + "Total number of times one of a flow's failure checkpoints was logged.", + ["flow"], +) +CHECKPOINTS_TOTAL_ENDED = Counter( + "worker_checkpoints_ended", + "Total number of times one of a flow's terminal checkpoints (success or failure) was logged.", + ["flow"], +) +CHECKPOINTS_ERRORS = Counter( + "worker_checkpoints_errors", + "Total number of errors while trying to log checkpoints", + ["flow"], +) +CHECKPOINTS_EVENTS = Counter( + "worker_checkpoints_events", + "Total number of checkpoints logged.", + ["flow", "checkpoint"], +) + +CHECKPOINTS_SUBFLOW_DURATION = Histogram( + "worker_checkpoints_subflow_duration_seconds", + "Duration of subflows in seconds.", + ["flow", "subflow"], + buckets=[0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 180, 300, 600, 900], +) + + class BaseFlow(str, Enum): """ Base class for a flow. Defines optional functions which are added by the @@ -266,10 +305,12 @@ class MyEnum(str, Enum): def log_counters(obj: T) -> None: metrics.incr(f"{klass.__name__}.events.{obj.name}") + CHECKPOINTS_EVENTS.labels(flow=klass.__name__, checkpoint=obj.name).inc() # If this is the first checkpoint, increment the number of flows we've begun if obj == next(iter(klass.__members__.values())): metrics.incr(f"{klass.__name__}.total.begun") + CHECKPOINTS_TOTAL_BEGUN.labels(flow=klass.__name__).inc() return is_failure = hasattr(obj, "is_failure") and obj.is_failure() @@ -278,11 +319,14 @@ def log_counters(obj: T) -> None: if is_failure: metrics.incr(f"{klass.__name__}.total.failed") + CHECKPOINTS_TOTAL_FAILED.labels(flow=klass.__name__).inc() elif is_success: metrics.incr(f"{klass.__name__}.total.succeeded") + CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=klass.__name__).inc() if is_terminal: metrics.incr(f"{klass.__name__}.total.ended") + CHECKPOINTS_TOTAL_ENDED.labels(flow=klass.__name__).inc() klass.log_counters = log_counters return klass @@ -356,6 +400,7 @@ def _error(self: _Self, msg: str) -> None: # may have been enqueued by the old worker and be missing checkpoints # data. At least for that reason, we want to allow failing softly. metrics.incr("worker.checkpoint_logger.error") + CHECKPOINTS_ERRORS.labels(flow=self.cls.__name__).inc() if self.strict: raise ValueError(msg) else: @@ -422,6 +467,10 @@ def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self: duration = self._subflow_duration(start, end) if duration: sentry_sdk.set_measurement(metric, duration, "milliseconds") + duration_in_seconds = duration / 1000 + CHECKPOINTS_SUBFLOW_DURATION.labels( + flow=self.cls.__name__, subflow=metric + ).observe(duration_in_seconds) return self diff --git a/helpers/tests/unit/test_checkpoint_logger.py b/helpers/tests/unit/test_checkpoint_logger.py index d5ec96b55..012fcd75a 100644 --- a/helpers/tests/unit/test_checkpoint_logger.py +++ b/helpers/tests/unit/test_checkpoint_logger.py @@ -5,6 +5,7 @@ import pytest import sentry_sdk +from prometheus_client import REGISTRY from shared.utils.test_utils import mock_metrics from helpers.checkpoint_logger import ( @@ -19,6 +20,42 @@ ) +class CounterAssertion: + def __init__(self, metric, labels, expected_value): + self.metric = metric + self.labels = labels + self.expected_value = expected_value + + self.before_value = None + self.after_value = None + + def __repr__(self): + return f"" + + +class CounterAssertionSet: + def __init__(self, counter_assertions): + self.counter_assertions = counter_assertions + + def __enter__(self): + for assertion in self.counter_assertions: + assertion.before_value = ( + REGISTRY.get_sample_value(assertion.metric, labels=assertion.labels) + or 0 + ) + + def __exit__(self, exc_type, exc_value, exc_tb): + for assertion in self.counter_assertions: + assertion.after_value = ( + REGISTRY.get_sample_value(assertion.metric, labels=assertion.labels) + or 0 + ) + assert ( + assertion.after_value - assertion.before_value + == assertion.expected_value + ) + + @failure_events("BRANCH_1_FAIL") @success_events("BRANCH_1_SUCCESS", "BRANCH_2_SUCCESS") @subflows( @@ -161,6 +198,13 @@ def test_submit_subflow(self, mock_sentry, mock_timestamp): expected_duration = 9001 - 1337 checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B) mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds") + assert ( + REGISTRY.get_sample_value( + "worker_checkpoints_subflow_duration_seconds_sum", + labels={"flow": "TestEnum1", "subflow": "metricname"}, + ) + == expected_duration / 1000 + ) @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337]) def test_log_ignore_repeat(self, mock_timestamp): @@ -311,7 +355,27 @@ def test_reliability_counters(self): checkpoints = CheckpointLogger(DecoratedEnum) - checkpoints.log(DecoratedEnum.BEGIN) + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0 + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BEGIN) assert metrics.data["DecoratedEnum.events.BEGIN"] == 1 assert metrics.data["DecoratedEnum.total.begun"] == 1 assert "DecoratedEnum.total.succeeded" not in metrics.data @@ -319,7 +383,32 @@ def test_reliability_counters(self): assert "DecoratedEnum.total.ended" not in metrics.data # Nothing special about `CHECKPOINT` - no counters should change - checkpoints.log(DecoratedEnum.CHECKPOINT) + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BEGIN"}, + 0, + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "CHECKPOINT"}, + 1, + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0 + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.CHECKPOINT) assert metrics.data["DecoratedEnum.events.CHECKPOINT"] == 1 assert metrics.data["DecoratedEnum.total.begun"] == 1 assert "DecoratedEnum.total.succeeded" not in metrics.data @@ -327,7 +416,27 @@ def test_reliability_counters(self): assert "DecoratedEnum.total.ended" not in metrics.data # Failures should increment both `failed` and `ended` - checkpoints.log(DecoratedEnum.BRANCH_1_FAIL) + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_FAIL"}, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_1_FAIL) assert metrics.data["DecoratedEnum.events.BRANCH_1_FAIL"] == 1 assert metrics.data["DecoratedEnum.total.begun"] == 1 assert metrics.data["DecoratedEnum.total.failed"] == 1 @@ -335,7 +444,27 @@ def test_reliability_counters(self): assert "DecoratedEnum.total.succeeded" not in metrics.data # Successes should increment both `succeeded` and `ended` - checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS) + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_SUCCESS"}, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS) assert metrics.data["DecoratedEnum.events.BRANCH_1_SUCCESS"] == 1 assert metrics.data["DecoratedEnum.total.begun"] == 1 assert metrics.data["DecoratedEnum.total.failed"] == 1 @@ -343,7 +472,27 @@ def test_reliability_counters(self): assert metrics.data["DecoratedEnum.total.succeeded"] == 1 # A different success path should also increment `succeeded` and `ended` - checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS) + counter_assertions = [ + CounterAssertion( + "worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0 + ), + CounterAssertion( + "worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1 + ), + CounterAssertion( + "worker_checkpoints_events_total", + {"flow": "DecoratedEnum", "checkpoint": "BRANCH_2_SUCCESS"}, + 1, + ), + ] + with CounterAssertionSet(counter_assertions): + checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS) assert metrics.data["DecoratedEnum.events.BRANCH_2_SUCCESS"] == 1 assert metrics.data["DecoratedEnum.total.begun"] == 1 assert metrics.data["DecoratedEnum.total.failed"] == 1 From 5c8b6fe9f8b1fe150d716bb551ec611d3290f48f Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Wed, 25 Oct 2023 16:37:48 -0700 Subject: [PATCH 2/2] add checks to avoid metric pollution and log noise --- helpers/checkpoint_logger/__init__.py | 3 ++ tasks/notify.py | 42 +++++++++++----------- tasks/tests/unit/test_notify_task.py | 52 +++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 20 deletions(-) diff --git a/helpers/checkpoint_logger/__init__.py b/helpers/checkpoint_logger/__init__.py index d56a8e20c..b8f0cb878 100644 --- a/helpers/checkpoint_logger/__init__.py +++ b/helpers/checkpoint_logger/__init__.py @@ -443,6 +443,9 @@ def log( self.data[checkpoint] = _get_milli_timestamp() elif not ignore_repeat: self._error(f"Already recorded checkpoint {checkpoint}") + return self + else: + return self if kwargs is not None: kwargs[self.kwargs_key] = self.data diff --git a/tasks/notify.py b/tasks/notify.py index 99186f993..5956fc4d5 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -92,13 +92,27 @@ async def run_async( lock_acquired=lock_acquired, ), ), - checkpoints_from_kwargs(UploadFlow, kwargs).log(UploadFlow.NOTIF_LOCK_ERROR) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_LOCK_ERROR) return { "notified": False, "notifications": None, "reason": "unobtainable_lock", } + def log_checkpoint(self, kwargs, checkpoint): + """ + Only log a checkpoint if whoever scheduled us sent checkpoints data from + the same flow. + + The notify task is an important part of `UploadFlow`, but it's also used + elsewhere. If this instance of the notify task wasn't scheduled as part + of upload processing, attempting to log `UploadFlow` checkpoints for it + will pollute our metrics. + """ + checkpoints = checkpoints_from_kwargs(checkpoint.__class__, kwargs) + if checkpoints.data: + checkpoints.log(checkpoint) + async def run_async_within_lock( self, db_session: Session, @@ -125,9 +139,7 @@ async def run_async_within_lock( "Unable to start notifications because repo doesn't have a valid bot", extra=dict(repoid=repoid, commit=commitid), ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_NO_VALID_INTEGRATION - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_NO_VALID_INTEGRATION) return {"notified": False, "notifications": None, "reason": "no_valid_bot"} if current_yaml is None: current_yaml = await get_current_yaml(commit, repository_service) @@ -143,9 +155,7 @@ async def run_async_within_lock( "Unable to fetch CI results due to a client problem. Not notifying user", extra=dict(repoid=commit.repoid, commit=commit.commitid, code=ex.code), ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_GIT_CLIENT_ERROR - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_GIT_CLIENT_ERROR) return { "notified": False, "notifications": None, @@ -156,9 +166,7 @@ async def run_async_within_lock( "Unable to fetch CI results due to server issues. Not notifying user", extra=dict(repoid=commit.repoid, commit=commit.commitid), ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_GIT_SERVICE_ERROR - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_GIT_SERVICE_ERROR) return { "notified": False, "notifications": None, @@ -189,9 +197,7 @@ async def run_async_within_lock( current_yaml=current_yaml.to_dict(), ), ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_TOO_MANY_RETRIES - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_TOO_MANY_RETRIES) return { "notified": False, "notifications": None, @@ -229,9 +235,7 @@ async def run_async_within_lock( pull_head=enriched_pull.provider_pull["head"]["commitid"], ), ) - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_STALE_HEAD - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_STALE_HEAD) return { "notified": False, "notifications": None, @@ -248,9 +252,7 @@ async def run_async_within_lock( commit, report_class=ReadOnlyReport ) if head_report is None and empty_upload is None: - checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.NOTIF_ERROR_NO_REPORT - ) + self.log_checkpoint(kwargs, UploadFlow.NOTIF_ERROR_NO_REPORT) return { "notified": False, "notifications": None, @@ -273,7 +275,7 @@ async def run_async_within_lock( enriched_pull, empty_upload, ) - checkpoints_from_kwargs(UploadFlow, kwargs).log(UploadFlow.NOTIFIED) + self.log_checkpoint(kwargs, UploadFlow.NOTIFIED) log.info( "Notifications done", extra=dict( diff --git a/tasks/tests/unit/test_notify_task.py b/tasks/tests/unit/test_notify_task.py index b7ae3089f..ceed37daf 100644 --- a/tasks/tests/unit/test_notify_task.py +++ b/tasks/tests/unit/test_notify_task.py @@ -927,3 +927,55 @@ async def test_run_async_can_run_logic(self, dbsession, mock_redis, mocker): empty_upload=None, **kwargs, ) + + @pytest.mark.asyncio + async def test_checkpoints_not_logged_outside_upload_flow( + self, dbsession, mock_redis, mocker, mock_checkpoint_submit, mock_configuration + ): + fake_notifier = mocker.MagicMock( + AbstractBaseNotifier, + is_enabled=mocker.MagicMock(return_value=True), + title="the_title", + notification_type=Notification.comment, + decoration_type=Decoration.standard, + ) + fake_notifier.name = "fake_hahaha" + fake_notifier.notify.return_value = NotificationResult( + notification_attempted=True, + notification_successful=True, + explanation="", + data_sent={"all": ["The", 1, "data"]}, + ) + mocker.patch.object( + NotificationService, "get_notifiers_instances", return_value=[fake_notifier] + ) + mock_configuration.params["setup"][ + "codecov_dashboard_url" + ] = "https://codecov.io" + mocker.patch.object(NotifyTask, "app") + mocker.patch.object(NotifyTask, "should_send_notifications", return_value=True) + fetch_and_update_whether_ci_passed_result = {} + mocker.patch.object( + NotifyTask, + "fetch_and_update_whether_ci_passed", + return_value=fetch_and_update_whether_ci_passed_result, + ) + mocked_fetch_pull = mocker.patch( + "tasks.notify.fetch_and_update_pull_request_information_from_commit" + ) + mocker.patch.object( + ReportService, "get_existing_report_for_commit", return_value=Report() + ) + mocked_fetch_pull.return_value = None + commit = CommitFactory.create(message="", pullid=None) + dbsession.add(commit) + dbsession.flush() + + task = NotifyTask() + result = await task.run_async_within_lock( + dbsession, + repoid=commit.repoid, + commitid=commit.commitid, + current_yaml={"coverage": {"status": {"patch": True}}}, + ) + assert not mock_checkpoint_submit.called