From 3d332896335bd81a15084e2d4d12dea8fa7ae536 Mon Sep 17 00:00:00 2001 From: joseph-sentry Date: Wed, 8 May 2024 10:45:35 -0400 Subject: [PATCH] feat: add TestResultsFlow for measuring time to notification Signed-off-by: joseph-sentry --- helpers/checkpoint_logger/flows.py | 14 +++++++++++++ tasks/test_results_finisher.py | 11 +++++++++- tasks/tests/unit/test_upload_task.py | 1 + tasks/upload.py | 30 ++++++++++++++++------------ 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/helpers/checkpoint_logger/flows.py b/helpers/checkpoint_logger/flows.py index de352f5e2..98394ac75 100644 --- a/helpers/checkpoint_logger/flows.py +++ b/helpers/checkpoint_logger/flows.py @@ -55,3 +55,17 @@ class UploadFlow(BaseFlow): NOTIF_TOO_MANY_RETRIES = auto() NOTIF_STALE_HEAD = auto() NOTIF_ERROR_NO_REPORT = auto() + + +@failure_events("TEST_RESULTS_ERROR") +@success_events("TEST_RESULTS_BEGIN") +@subflows( + ("notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"), + ("notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"), +) +@reliability_counters +class TestResultsFlow(BaseFlow): + TEST_RESULTS_BEGIN = auto() + TEST_RESULTS_NOTIFY = auto() + FLAKE_DETECTION_NOTIFY = auto() + TEST_RESULTS_ERROR = auto() diff --git a/tasks/test_results_finisher.py b/tasks/test_results_finisher.py index 44b336960..5cb46d63e 100644 --- a/tasks/test_results_finisher.py +++ b/tasks/test_results_finisher.py @@ -11,6 +11,8 @@ from app import celery_app from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError from database.models import Commit, TestResultReportTotals +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import TestResultsFlow from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths from rollouts import FLAKY_TEST_DETECTION from services.failure_normalizer import FailureNormalizer @@ -67,6 +69,7 @@ def run_impl( commit_yaml: dict, **kwargs, ): + repoid = int(repoid) commit_yaml = UserYaml.from_dict(commit_yaml) @@ -136,6 +139,8 @@ def process_impl_within_lock( ), ) + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs) + commit: Commit = ( db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first() ) @@ -271,6 +276,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.notification"): + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( @@ -302,7 +308,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.run_flaky_test_detection"): success, reason = self.run_flaky_test_detection( - db_session, repoid, notifier, payload + db_session, repoid, notifier, payload, checkpoints=checkpoints ) metrics.incr( @@ -322,6 +328,7 @@ def run_flaky_test_detection( repoid, notifier: TestResultsNotifier, payload: TestResultsNotificationPayload, + checkpoints=None, ): ignore_predefined = read_yaml_field( "test_analytics", "ignore_predefined", _else=False @@ -376,6 +383,8 @@ def run_flaky_test_detection( ) db_session.flush() + if checkpoints: + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( "Added flaky test information to the PR comment", diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 66dc2ae6f..ce27a758d 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -335,6 +335,7 @@ def test_upload_task_call_test_results( repoid=commit.repoid, commitid=commit.commitid, commit_yaml={"codecov": {"max_report_age": "1y ago"}}, + checkpoints_TestResultsFlow=None, ) ) diff --git a/tasks/upload.py b/tasks/upload.py index 1f869bdef..92bfb8bcb 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -23,7 +23,7 @@ from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs -from helpers.checkpoint_logger.flows import UploadFlow +from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.metrics import metrics @@ -258,6 +258,10 @@ def run_impl( checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log( UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True ) + elif report_type == ReportType.TEST_RESULTS.value: + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log( + TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True + ) repoid = int(repoid) log.info( @@ -628,7 +632,7 @@ def schedule_task( ) elif commit_report.report_type == ReportType.TEST_RESULTS.value: res = self._schedule_test_results_processing_task( - commit, commit_yaml, argument_list, commit_report + commit, commit_yaml, argument_list, commit_report, checkpoints ) if res: @@ -850,11 +854,7 @@ def _schedule_bundle_analysis_processing_task( return res def _schedule_test_results_processing_task( - self, - commit, - commit_yaml, - argument_list, - commit_report, + self, commit, commit_yaml, argument_list, commit_report, checkpoints=None ): processor_task_group = [] for i in range(0, len(argument_list), CHUNK_SIZE): @@ -872,16 +872,20 @@ def _schedule_test_results_processing_task( ) processor_task_group.append(sig) if processor_task_group: - + checkpoint_data = None + if checkpoints: + checkpoint_data = checkpoints.data + kwargs = { + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + _kwargs_key(TestResultsFlow): checkpoint_data, + } res = chord( processor_task_group, test_results_finisher_task.signature( args=(), - kwargs=dict( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - ), + kwargs=kwargs, ), ).apply_async()