diff --git a/helpers/checkpoint_logger/flows.py b/helpers/checkpoint_logger/flows.py index de352f5e2..98394ac75 100644 --- a/helpers/checkpoint_logger/flows.py +++ b/helpers/checkpoint_logger/flows.py @@ -55,3 +55,17 @@ class UploadFlow(BaseFlow): NOTIF_TOO_MANY_RETRIES = auto() NOTIF_STALE_HEAD = auto() NOTIF_ERROR_NO_REPORT = auto() + + +@failure_events("TEST_RESULTS_ERROR") +@success_events("TEST_RESULTS_BEGIN") +@subflows( + ("notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"), + ("notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"), +) +@reliability_counters +class TestResultsFlow(BaseFlow): + TEST_RESULTS_BEGIN = auto() + TEST_RESULTS_NOTIFY = auto() + FLAKE_DETECTION_NOTIFY = auto() + TEST_RESULTS_ERROR = auto() diff --git a/tasks/test_results_finisher.py b/tasks/test_results_finisher.py index 3b10b338f..c303423f9 100644 --- a/tasks/test_results_finisher.py +++ b/tasks/test_results_finisher.py @@ -10,6 +10,8 @@ from app import celery_app from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError from database.models import Commit, TestResultReportTotals +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs +from helpers.checkpoint_logger.flows import TestResultsFlow from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths from rollouts import FLAKY_TEST_DETECTION from services.failure_normalizer import FailureNormalizer @@ -66,6 +68,7 @@ def run_impl( commit_yaml: dict, **kwargs, ): + repoid = int(repoid) commit_yaml = UserYaml.from_dict(commit_yaml) @@ -135,6 +138,8 @@ def process_impl_within_lock( ), ) + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs) + commit: Commit = ( db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first() ) @@ -270,6 +275,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.notification"): + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( @@ -301,7 +307,7 @@ def process_impl_within_lock( ) with metrics.timing("test_results.finisher.run_flaky_test_detection"): success, reason = self.run_flaky_test_detection( - db_session, repoid, notifier, payload + db_session, repoid, notifier, payload, checkpoints=checkpoints ) metrics.incr( @@ -321,6 +327,7 @@ def run_flaky_test_detection( repoid, notifier: TestResultsNotifier, payload: TestResultsNotificationPayload, + checkpoints=None, ): ignore_predefined = read_yaml_field( "test_analytics", "ignore_predefined", _else=False @@ -375,6 +382,8 @@ def run_flaky_test_detection( ) db_session.flush() + if checkpoints: + checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY) success, reason = async_to_sync(notifier.notify)(payload) log.info( "Added flaky test information to the PR comment", diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 97df4c136..c7e4f4308 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -134,9 +134,9 @@ def test_upload_task_call( dbsession.flush() dbsession.refresh(commit) repo_updatestamp = commit.repository.updatestamp - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue checkpoints = _create_checkpoint_logger(mocker) kwargs = {_kwargs_key(UploadFlow): checkpoints.data} result = UploadTask().run_impl( @@ -299,9 +299,9 @@ def test_upload_task_call_test_results( dbsession.flush() dbsession.refresh(commit) - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}/test_results"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}/test_results" + ] = jsonified_redis_queue UploadTask().run_impl( dbsession, @@ -335,6 +335,7 @@ def test_upload_task_call_test_results( repoid=commit.repoid, commitid=commit.commitid, commit_yaml={"codecov": {"max_report_age": "1y ago"}}, + checkpoints_TestResultsFlow=None, ) ) @@ -408,9 +409,9 @@ def test_upload_task_upload_processing_delay_not_enough_delay( ) dbsession.add(commit) dbsession.flush() - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue mock_redis.keys[f"latest_upload/{commit.repoid}/{commit.commitid}"] = ( datetime.utcnow() - timedelta(seconds=10) ).timestamp() @@ -548,9 +549,9 @@ def test_upload_task_call_multiple_processors( ) dbsession.add(commit) dbsession.flush() - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid) expected_result = {"was_setup": False, "was_updated": True} assert expected_result == result @@ -663,9 +664,9 @@ def test_upload_task_proper_parent( dbsession.flush() redis_queue = [{"build": "part1"}] jsonified_redis_queue = [json.dumps(x) for x in redis_queue] - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid) expected_result = {"was_setup": False, "was_updated": True} assert expected_result == result @@ -710,9 +711,9 @@ def test_upload_task_no_bot( ) dbsession.add(commit) dbsession.flush() - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid) expected_result = {"was_setup": False, "was_updated": False} assert expected_result == result @@ -765,9 +766,9 @@ def test_upload_task_bot_no_permissions( ) dbsession.add(commit) dbsession.flush() - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid) expected_result = {"was_setup": False, "was_updated": False} assert expected_result == result @@ -820,9 +821,9 @@ def test_upload_task_bot_unauthorized( mock_repo_provider.data = dict(repo=dict(repoid=commit.repoid)) dbsession.add(commit) dbsession.flush() - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue upload_args = UploadContext( repoid=commit.repoid, commitid=commit.commitid, @@ -920,9 +921,9 @@ def fail_if_try_to_create_upload(*args, **kwargs): {"build": "part1", "url": "url1", "upload_id": upload.id_}, ] jsonified_redis_queue = [json.dumps(x) for x in redis_queue] - mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = ( - jsonified_redis_queue - ) + mock_redis.lists[ + f"uploads/{commit.repoid}/{commit.commitid}" + ] = jsonified_redis_queue upload_args = UploadContext( repoid=commit.repoid, commitid=commit.commitid, diff --git a/tasks/upload.py b/tasks/upload.py index d6f78f7fe..6d67a1987 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -26,7 +26,7 @@ from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs -from helpers.checkpoint_logger.flows import UploadFlow +from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.parallel_upload_processing import get_parallel_session_ids @@ -259,6 +259,10 @@ def run_impl( checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log( UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True ) + elif report_type == ReportType.TEST_RESULTS.value: + checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log( + TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True + ) repoid = int(repoid) log.info( @@ -629,7 +633,7 @@ def schedule_task( ) elif commit_report.report_type == ReportType.TEST_RESULTS.value: res = self._schedule_test_results_processing_task( - commit, commit_yaml, argument_list, commit_report + commit, commit_yaml, argument_list, commit_report, checkpoints ) if res: @@ -851,11 +855,7 @@ def _schedule_bundle_analysis_processing_task( return res def _schedule_test_results_processing_task( - self, - commit, - commit_yaml, - argument_list, - commit_report, + self, commit, commit_yaml, argument_list, commit_report, checkpoints=None ): processor_task_group = [] for i in range(0, len(argument_list), CHUNK_SIZE): @@ -873,15 +873,20 @@ def _schedule_test_results_processing_task( ) processor_task_group.append(sig) if processor_task_group: + checkpoint_data = None + if checkpoints: + checkpoint_data = checkpoints.data + kwargs = { + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + _kwargs_key(TestResultsFlow): checkpoint_data, + } res = chord( processor_task_group, test_results_finisher_task.signature( args=(), - kwargs=dict( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - ), + kwargs=kwargs, ), ).apply_async()