From 0705c36b4ec7f90a085f106f88d126e1290bc875 Mon Sep 17 00:00:00 2001 From: joseph-sentry <136376984+joseph-sentry@users.noreply.github.com> Date: Wed, 19 Jun 2024 11:48:56 -0400 Subject: [PATCH] feat: add TestResultsFlow for measuring time to notification (#439) Signed-off-by: joseph-sentry --- helpers/checkpoint_logger/flows.py | 20 +++++++++++++++++++ tasks/test_results_finisher.py | 12 +++++++++++- tasks/tests/unit/test_upload_task.py | 1 + tasks/upload.py | 29 ++++++++++++++++------------ 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/helpers/checkpoint_logger/flows.py b/helpers/checkpoint_logger/flows.py index de352f5e2..5e156e9c7 100644 --- a/helpers/checkpoint_logger/flows.py +++ b/helpers/checkpoint_logger/flows.py @@ -55,3 +55,23 @@ class UploadFlow(BaseFlow): NOTIF_TOO_MANY_RETRIES = auto() NOTIF_STALE_HEAD = auto() NOTIF_ERROR_NO_REPORT = auto() + + +@failure_events("TEST_RESULTS_ERROR") +@success_events("TEST_RESULTS_BEGIN") +@subflows( + ("test_results_notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"), + ("flake_notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"), + ( + "test_results_processing_time", + "TEST_RESULTS_BEGIN", + "TEST_RESULTS_FINISHER_BEGIN", + ), +) +@reliability_counters +class TestResultsFlow(BaseFlow): + TEST_RESULTS_BEGIN = auto() + TEST_RESULTS_NOTIFY = auto() + FLAKE_DETECTION_NOTIFY = auto() + TEST_RESULTS_ERROR = auto() + TEST_RESULTS_FINISHER_BEGIN = auto() diff --git a/tasks/test_results_finisher.py b/tasks/test_results_finisher.py index 3b10b338f..438962cd6 100644 --- a/tasks/test_results_finisher.py +++ b/tasks/test_results_finisher.py @@ -10,6 +10,8 @@ from app import celery_app from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError from database.models import Commit, TestResultReportTotals +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import TestResultsFlow from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths from rollouts import FLAKY_TEST_DETECTION from services.failure_normalizer import FailureNormalizer @@ -135,6 +137,10 @@ def process_impl_within_lock( ), ) + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs) + + checkpoints.log(TestResultsFlow.TEST_RESULTS_FINISHER_BEGIN) + commit: Commit = ( db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first() ) @@ -270,6 +276,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.notification"): + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( @@ -301,7 +308,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.run_flaky_test_detection"): success, reason = self.run_flaky_test_detection( - db_session, repoid, notifier, payload + db_session, repoid, notifier, payload, checkpoints=checkpoints ) metrics.incr( @@ -321,6 +328,7 @@ def run_flaky_test_detection( repoid, notifier: TestResultsNotifier, payload: TestResultsNotificationPayload, + checkpoints=None, ): ignore_predefined = read_yaml_field( "test_analytics", "ignore_predefined", _else=False @@ -375,6 +383,8 @@ def run_flaky_test_detection( ) db_session.flush() + if checkpoints: + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( "Added flaky test information to the PR comment", diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 3edd24708..b363fd3eb 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -335,6 +335,7 @@ def test_upload_task_call_test_results( repoid=commit.repoid, commitid=commit.commitid, commit_yaml={"codecov": {"max_report_age": "1y ago"}}, + checkpoints_TestResultsFlow=None, ) ) diff --git a/tasks/upload.py b/tasks/upload.py index e45664468..eab16ae7f 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -26,7 +26,7 @@ from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs -from helpers.checkpoint_logger.flows import UploadFlow +from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.parallel_upload_processing import get_parallel_session_ids @@ -258,6 +258,10 @@ def run_impl( checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log( UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True ) + elif report_type == ReportType.TEST_RESULTS.value: + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log( + TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True + ) repoid = int(repoid) log.info( @@ -628,7 +632,7 @@ def schedule_task( ) elif commit_report.report_type == ReportType.TEST_RESULTS.value: res = self._schedule_test_results_processing_task( - commit, commit_yaml, argument_list, commit_report + commit, commit_yaml, argument_list, commit_report, checkpoints ) if res: @@ -850,11 +854,7 @@ def _schedule_bundle_analysis_processing_task( return res def _schedule_test_results_processing_task( - self, - commit, - commit_yaml, - argument_list, - commit_report, + self, commit, commit_yaml, argument_list, commit_report, checkpoints=None ): processor_task_group = [] for i in range(0, len(argument_list), CHUNK_SIZE): @@ -872,15 +872,20 @@ def _schedule_test_results_processing_task( ) processor_task_group.append(sig) if processor_task_group: + checkpoint_data = None + if checkpoints: + checkpoint_data = checkpoints.data + kwargs = { + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + _kwargs_key(TestResultsFlow): checkpoint_data, + } res = chord( processor_task_group, test_results_finisher_task.signature( args=(), - kwargs=dict( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - ), + kwargs=kwargs, ), ).apply_async()