From 716078f2995bfe92945e03d9862ec4aa5e13c791 Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Tue, 1 Aug 2023 16:30:32 -0700 Subject: [PATCH 1/2] feat: add "flow logger" + instrument upload flow with it --- conftest.py | 25 +++- helpers/checkpoint_logger.py | 55 ++++++++ helpers/tests/unit/test_checkpoint_logger.py | 123 ++++++++++++++++++ pytest.ini | 3 +- tasks/notify.py | 12 +- tasks/tests/unit/test_notify_task.py | 46 ++++++- tasks/tests/unit/test_upload_finisher_task.py | 55 +++++++- tasks/tests/unit/test_upload_task.py | 55 +++++++- tasks/upload.py | 50 ++++++- tasks/upload_finisher.py | 22 ++++ 10 files changed, 428 insertions(+), 18 deletions(-) create mode 100644 helpers/checkpoint_logger.py create mode 100644 helpers/tests/unit/test_checkpoint_logger.py diff --git a/conftest.py b/conftest.py index 9c364a793..4068694a1 100644 --- a/conftest.py +++ b/conftest.py @@ -103,7 +103,6 @@ def dbsession(db, engine): @event.listens_for(session, "after_transaction_end") def restart_savepoint(session, transaction): if transaction.nested and not transaction._parent.nested: - # ensure that state is expired the way # session.commit() at the top level normally does # (optional step) @@ -254,3 +253,27 @@ def with_sql_functions(dbsession): $$ language sql stable strict;""" ) dbsession.flush() + + +# We don't want any tests submitting checkpoint logs to Sentry for real +@pytest.fixture(autouse=True) +def mock_checkpoint_submit(mocker, request): + # We mock sentry differently in the tests for CheckpointLogger + if request.node.get_closest_marker("real_checkpoint_logger"): + return + + def mock_submit_fn(metric, start, end): + pass + + mock_submit = mocker.Mock() + mock_submit.side_effect = mock_submit_fn + + import helpers + + return mocker.patch.object( + helpers.checkpoint_logger.CheckpointLogger, + "submit_subflow", + mock_submit, + ) + + return mock_submit diff --git a/helpers/checkpoint_logger.py b/helpers/checkpoint_logger.py new file mode 100644 index 000000000..990bcd172 --- /dev/null +++ b/helpers/checkpoint_logger.py @@ -0,0 +1,55 @@ +import time +from enum import Enum, auto + +import sentry_sdk + + +class UploadFlow(Enum): + UPLOAD_TASK_BEGIN = auto() + PROCESSING_BEGIN = auto() + INITIAL_PROCESSING_COMPLETE = auto() + BATCH_PROCESSING_COMPLETE = auto() + PROCESSING_COMPLETE = auto() + NOTIFIED = auto() + + +def _get_milli_timestamp(): + return time.time_ns() // 1000000 + + +class CheckpointLogger: + def __init__(self, cls): + self.cls = cls + self.data = {} + + def _validate_checkpoint(self, checkpoint): + if checkpoint.__class__ != self.cls: + raise ValueError( + f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`" + ) + + def _subflow_duration(self, start, end): + self._validate_checkpoint(start) + self._validate_checkpoint(end) + if start not in self.data: + raise ValueError( + f"Cannot compute duration; missing start checkpoint {start}" + ) + elif end not in self.data: + raise ValueError(f"Cannot compute duration; missing end checkpoint {end}") + elif end.value <= start.value: + raise ValueError( + f"Cannot compute duration; end {end} is not after start {start}" + ) + + return self.data[end] - self.data[start] + + def log(self, checkpoint): + if checkpoint in self.data: + raise ValueError(f"Already recorded checkpoint {checkpoint}") + self._validate_checkpoint(checkpoint) + self.data[checkpoint] = _get_milli_timestamp() + + def submit_subflow(self, metric, start, end): + duration = self._subflow_duration(start, end) + sentry_sdk.set_measurement(metric, duration, "milliseconds") diff --git a/helpers/tests/unit/test_checkpoint_logger.py b/helpers/tests/unit/test_checkpoint_logger.py new file mode 100644 index 000000000..c2a97a7a4 --- /dev/null +++ b/helpers/tests/unit/test_checkpoint_logger.py @@ -0,0 +1,123 @@ +import unittest +from enum import Enum, auto +from unittest.mock import ANY, patch + +import pytest +import sentry_sdk + +from helpers.checkpoint_logger import CheckpointLogger, _get_milli_timestamp + + +class TestEnum1(Enum): + A = auto() + B = auto() + C = auto() + + +class TestEnum2(Enum): + A = auto() + B = auto() + C = auto() + + +class TestCheckpointLogger(unittest.TestCase): + @patch("time.time_ns", return_value=123456789) + def test_get_milli_timestamp(self, mocker): + expected_ms = 123456789 // 1000000 + self.assertEqual(_get_milli_timestamp(), expected_ms) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337) + def test_log_checkpoint(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + + self.assertEqual(checkpoints.data[TestEnum1.A], 1337) + + @patch( + "helpers.checkpoint_logger._get_milli_timestamp", + side_effect=[1337, 9001, 100000], + ) + def test_log_multiple_checkpoints(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + checkpoints.log(TestEnum1.B) + checkpoints.log(TestEnum1.C) + + self.assertEqual(checkpoints.data[TestEnum1.A], 1337) + self.assertEqual(checkpoints.data[TestEnum1.B], 9001) + self.assertEqual(checkpoints.data[TestEnum1.C], 100000) + + def test_log_checkpoint_twice_throws(self): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + + with self.assertRaises(ValueError): + checkpoints.log(TestEnum1.A) + + def test_log_checkpoint_wrong_enum_throws(self): + checkpoints = CheckpointLogger(TestEnum1) + + with self.assertRaises(ValueError): + checkpoints.log(TestEnum2.A) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + def test_subflow_duration(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + checkpoints.log(TestEnum1.B) + + duration = checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B) + self.assertEqual(duration, 9001 - 1337) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + def test_subflow_duration_missing_checkpoints(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + checkpoints.log(TestEnum1.C) + + # Missing end checkpoint + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B) + + # Missing start checkpoint + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum1.B, TestEnum1.C) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + def test_subflow_duration_wrong_order(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + checkpoints.log(TestEnum1.B) + + # End < start + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum1.B, TestEnum1.A) + + # End == start + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum1.A, TestEnum1.A) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337) + def test_subflow_duration_wrong_enum(self, mocker): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + + # Wrong enum for start checkpoint + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum2.A, TestEnum1.A) + + # Wrong enum for end checkpoint + with self.assertRaises(ValueError): + checkpoints._subflow_duration(TestEnum1.A, TestEnum2.A) + + @pytest.mark.real_checkpoint_logger + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + @patch("sentry_sdk.set_measurement") + def test_submit_subflow(self, mock_sentry, mock_timestamp): + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A) + checkpoints.log(TestEnum1.B) + + expected_duration = 9001 - 1337 + checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B) + mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds") diff --git a/pytest.ini b/pytest.ini index 231f1b9e9..a73b9a96d 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,5 @@ [pytest] addopts = --sqlalchemy-connect-url="postgresql://postgres@postgres:5432/background_test" markers= - integration: integration tests (includes tests with vcrs) \ No newline at end of file + integration: integration tests (includes tests with vcrs) + real_checkpoint_logger: prevents use of stubbed CheckpointLogger diff --git a/tasks/notify.py b/tasks/notify.py index 4a360551e..61f31c107 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -15,6 +15,7 @@ from app import celery_app from database.enums import CommitErrorTypes, Decoration from database.models import Commit, Pull +from helpers.checkpoint_logger import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.activation import activate_user @@ -37,7 +38,6 @@ class NotifyTask(BaseCodecovTask): - name = notify_task_name throws = (SoftTimeLimitExceeded,) @@ -50,6 +50,7 @@ async def run_async( commitid: str, current_yaml=None, empty_upload=None, + checkpoints=None, **kwargs, ): redis_connection = get_redis_connection() @@ -80,6 +81,7 @@ async def run_async( commitid=commitid, current_yaml=current_yaml, empty_upload=empty_upload, + checkpoints=checkpoints, **kwargs, ) except LockError as err: @@ -106,6 +108,7 @@ async def run_async_within_lock( commitid: str, current_yaml=None, empty_upload=None, + checkpoints=None, **kwargs, ): log.info("Starting notifications", extra=dict(commit=commitid, repoid=repoid)) @@ -234,6 +237,13 @@ async def run_async_within_lock( enriched_pull, empty_upload, ) + if checkpoints: + checkpoints.log(UploadFlow.NOTIFIED) + checkpoints.submit_subflow( + "notification_latency", + UploadFlow.UPLOAD_TASK_BEGIN, + UploadFlow.NOTIFIED, + ) log.info( "Notifications done", extra=dict( diff --git a/tasks/tests/unit/test_notify_task.py b/tasks/tests/unit/test_notify_task.py index 75d228f64..37c5d8bfc 100644 --- a/tasks/tests/unit/test_notify_task.py +++ b/tasks/tests/unit/test_notify_task.py @@ -1,3 +1,5 @@ +from unittest.mock import call + import pytest from celery.exceptions import MaxRetriesExceededError, Retry from redis.exceptions import LockError @@ -16,6 +18,7 @@ PullFactory, RepositoryFactory, ) +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from services.decoration import DecorationDetails from services.notification import NotificationService @@ -28,6 +31,20 @@ from tasks.notify import NotifyTask +def _create_checkpoint_logger(mocker): + mocker.patch( + "helpers.checkpoint_logger._get_milli_timestamp", + side_effect=[1337, 9001, 10000, 15000, 20000, 25000], + ) + checkpoints = CheckpointLogger(UploadFlow) + checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN) + checkpoints.log(UploadFlow.PROCESSING_BEGIN) + checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) + checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE) + checkpoints.log(UploadFlow.PROCESSING_COMPLETE) + return checkpoints + + @pytest.fixture def enriched_pull(dbsession): repository = RepositoryFactory.create( @@ -297,7 +314,12 @@ async def test_simple_call_no_notifications_no_yaml_given( @pytest.mark.asyncio async def test_simple_call_yes_notifications_no_base( - self, dbsession, mocker, mock_storage, mock_configuration + self, + dbsession, + mocker, + mock_storage, + mock_configuration, + mock_checkpoint_submit, ): fake_notifier = mocker.MagicMock( AbstractBaseNotifier, @@ -337,12 +359,16 @@ async def test_simple_call_yes_notifications_no_base( commit = CommitFactory.create(message="", pullid=None) dbsession.add(commit) dbsession.flush() + + checkpoints = _create_checkpoint_logger(mocker) + task = NotifyTask() result = await task.run_async_within_lock( dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml={"coverage": {"status": {"patch": True}}}, + checkpoints=checkpoints, ) print(result) expected_result = { @@ -369,6 +395,23 @@ async def test_simple_call_yes_notifications_no_base( dbsession.refresh(commit) assert commit.notified is True + assert checkpoints.data == { + UploadFlow.UPLOAD_TASK_BEGIN: 1337, + UploadFlow.PROCESSING_BEGIN: 9001, + UploadFlow.INITIAL_PROCESSING_COMPLETE: 10000, + UploadFlow.BATCH_PROCESSING_COMPLETE: 15000, + UploadFlow.PROCESSING_COMPLETE: 20000, + UploadFlow.NOTIFIED: 25000, + } + calls = [ + call( + "notification_latency", + UploadFlow.UPLOAD_TASK_BEGIN, + UploadFlow.NOTIFIED, + ), + ] + mock_checkpoint_submit.assert_has_calls(calls) + @pytest.mark.asyncio async def test_simple_call_no_pullrequest_found( self, dbsession, mocker, mock_storage, mock_configuration @@ -818,4 +861,5 @@ async def test_run_async_can_run_logic(self, dbsession, mock_redis, mocker): commitid=commit.commitid, current_yaml=current_yaml, empty_upload=None, + checkpoints=mocker.ANY, ) diff --git a/tasks/tests/unit/test_upload_finisher_task.py b/tasks/tests/unit/test_upload_finisher_task.py index 7dab8901a..3f19629ff 100644 --- a/tasks/tests/unit/test_upload_finisher_task.py +++ b/tasks/tests/unit/test_upload_finisher_task.py @@ -1,5 +1,6 @@ import json from pathlib import Path +from unittest.mock import ANY, call import pytest from shared.yaml import UserYaml @@ -11,11 +12,24 @@ RepositoryFactory, UploadFactory, ) +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow from tasks.upload_finisher import ReportService, UploadFinisherTask here = Path(__file__) +def _create_checkpoint_logger(mocker): + mocker.patch( + "helpers.checkpoint_logger._get_milli_timestamp", + side_effect=[1337, 9001, 10000, 15000, 20000, 25000], + ) + checkpoints = CheckpointLogger(UploadFlow) + checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN) + checkpoints.log(UploadFlow.PROCESSING_BEGIN) + checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) + return checkpoints + + class TestUploadFinisherTask(object): @pytest.mark.asyncio async def test_upload_finisher_task_call( @@ -26,6 +40,7 @@ async def test_upload_finisher_task_call( codecov_vcr, mock_storage, mock_redis, + mock_checkpoint_submit, ): url = "v4/raw/2019-05-22/C3C4715CA57C910D11D5EB899FC86A7E/4c4e4654ac25037ae869caeb3619d485970b6304/a84d445c-9c1e-434f-8275-f18f1f320f81.txt" redis_queue = [{"url": url}] @@ -55,12 +70,15 @@ async def test_upload_finisher_task_call( previous_results = { "processings_so_far": [{"arguments": {"url": url}, "successful": True}] } + + checkpoints = _create_checkpoint_logger(mocker) result = await UploadFinisherTask().run_async( dbsession, previous_results, repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, + checkpoints=checkpoints, ) assert commit.notified is False expected_result = {"notifications_called": True} @@ -74,6 +92,27 @@ async def test_upload_finisher_task_call( timeout=300, ) + assert checkpoints.data == { + UploadFlow.UPLOAD_TASK_BEGIN: 1337, + UploadFlow.PROCESSING_BEGIN: 9001, + UploadFlow.INITIAL_PROCESSING_COMPLETE: 10000, + UploadFlow.BATCH_PROCESSING_COMPLETE: 15000, + UploadFlow.PROCESSING_COMPLETE: 20000, + } + calls = [ + call( + "batch_processing_duration", + UploadFlow.INITIAL_PROCESSING_COMPLETE, + UploadFlow.BATCH_PROCESSING_COMPLETE, + ), + call( + "total_processing_duration", + UploadFlow.PROCESSING_BEGIN, + UploadFlow.PROCESSING_COMPLETE, + ), + ] + mock_checkpoint_submit.assert_has_calls(calls) + @pytest.mark.asyncio async def test_upload_finisher_task_call_no_author( self, mocker, mock_configuration, dbsession, mock_storage, mock_redis @@ -338,12 +377,15 @@ async def test_finish_reports_processing(self, dbsession, mocker): dbsession.add(commit) dbsession.flush() res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None + dbsession, commit, UserYaml(commit_yaml), processing_results, None, None ) assert res == {"notifications_called": True} mocked_app.tasks["app.tasks.notify.Notify"].apply_async.assert_called_with( kwargs=dict( - commitid=commit.commitid, current_yaml=commit_yaml, repoid=commit.repoid + commitid=commit.commitid, + current_yaml=commit_yaml, + repoid=commit.repoid, + checkpoints=ANY, ) ) assert mocked_app.send_task.call_count == 0 @@ -382,12 +424,15 @@ async def test_finish_reports_processing_with_pull(self, dbsession, mocker): dbsession.add(pull) dbsession.flush() res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None + dbsession, commit, UserYaml(commit_yaml), processing_results, None, None ) assert res == {"notifications_called": True} mocked_app.tasks["app.tasks.notify.Notify"].apply_async.assert_called_with( kwargs=dict( - commitid=commit.commitid, current_yaml=commit_yaml, repoid=commit.repoid + commitid=commit.commitid, + current_yaml=commit_yaml, + repoid=commit.repoid, + checkpoints=ANY, ) ) mocked_app.tasks["app.tasks.pulls.Sync"].apply_async.assert_called_with( @@ -418,7 +463,7 @@ async def test_finish_reports_processing_no_notification(self, dbsession, mocker dbsession.add(commit) dbsession.flush() res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None + dbsession, commit, UserYaml(commit_yaml), processing_results, None, None ) assert res == {"notifications_called": False} assert mocked_app.send_task.call_count == 0 diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index bd57a8e23..544c39695 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -16,6 +16,7 @@ from database.models.reports import CommitReport from database.tests.factories import CommitFactory, OwnerFactory, RepositoryFactory from database.tests.factories.core import ReportFactory +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from services.archive import ArchiveService from services.report import NotReadyToBuildReportYetError, ReportService @@ -26,6 +27,16 @@ here = Path(__file__) +def _create_checkpoint_logger(mocker): + mocker.patch( + "helpers.checkpoint_logger._get_milli_timestamp", + side_effect=[1337, 9001, 10000, 15000, 20000, 25000], + ) + checkpoints = CheckpointLogger(UploadFlow) + checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN) + return checkpoints + + class FakeRedis(object): """ This is a fake, very rudimentary redis implementation to ease the managing @@ -91,6 +102,7 @@ async def test_upload_task_call( mock_storage, mock_redis, celery_app, + mock_checkpoint_submit, ): mocked_1 = mocker.patch("tasks.upload.chain") url = "v4/raw/2019-05-22/C3C4715CA57C910D11D5EB899FC86A7E/4c4e4654ac25037ae869caeb3619d485970b6304/a84d445c-9c1e-434f-8275-f18f1f320f81.txt" @@ -112,7 +124,10 @@ async def test_upload_task_call( mock_redis.lists[ f"uploads/{commit.repoid}/{commit.commitid}" ] = jsonified_redis_queue - result = await UploadTask().run_async(dbsession, commit.repoid, commit.commitid) + checkpoints = _create_checkpoint_logger(mocker) + result = await UploadTask().run_async( + dbsession, commit.repoid, commit.commitid, checkpoints=checkpoints + ) expected_result = {"was_setup": False, "was_updated": True} assert expected_result == result assert commit.message == "dsidsahdsahdsa" @@ -148,10 +163,30 @@ async def test_upload_task_call( commitid="abf6d4df662c47e32460020ab14abf9303581429", commit_yaml={"codecov": {"max_report_age": "1y ago"}}, report_code=None, + checkpoints=mocker.ANY, ) ) mocked_1.assert_called_with(t1, t2) + assert checkpoints.data == { + UploadFlow.UPLOAD_TASK_BEGIN: 1337, + UploadFlow.PROCESSING_BEGIN: 9001, + UploadFlow.INITIAL_PROCESSING_COMPLETE: 10000, + } + calls = [ + mock.call( + "time_before_processing", + UploadFlow.UPLOAD_TASK_BEGIN, + UploadFlow.PROCESSING_BEGIN, + ), + mock.call( + "initial_processing_duration", + UploadFlow.PROCESSING_BEGIN, + UploadFlow.INITIAL_PROCESSING_COMPLETE, + ), + ] + mock_checkpoint_submit.assert_has_calls(calls) + @pytest.mark.asyncio async def test_upload_task_call_no_jobs( self, @@ -419,6 +454,7 @@ async def test_upload_task_call_multiple_processors( commitid="abf6d4df662c47e32460020ab14abf9303581429", commit_yaml={"codecov": {"max_report_age": "1y ago"}}, report_code=None, + checkpoints=mocker.ANY, ) ) mocked_1.assert_called_with(t1, t2, t3, t_final) @@ -539,6 +575,7 @@ async def test_upload_task_no_bot( {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, ], commit.report, + mocker.ANY, ) assert not mocked_fetch_yaml.called @@ -592,6 +629,7 @@ async def test_upload_task_bot_no_permissions( {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, ], commit.report, + mocker.ANY, ) assert not mocked_fetch_yaml.called @@ -660,6 +698,7 @@ async def test_upload_task_bot_unauthorized( {"build": "part2", "url": "url2", "upload_pk": second_session.id}, ], commit.report, + mocker.ANY, ) @pytest.mark.asyncio @@ -744,6 +783,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): } ], report, + mocker.ANY, ) @@ -833,7 +873,11 @@ def test_schedule_task_with_no_tasks(self, dbsession): dbsession.add(commit) dbsession.flush() result = UploadTask().schedule_task( - commit, commit_yaml, argument_list, ReportFactory.create() + commit, + commit_yaml, + argument_list, + ReportFactory.create(), + None, ) assert result is None @@ -846,7 +890,11 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): dbsession.add(commit) dbsession.flush() result = UploadTask().schedule_task( - commit, commit_yaml, argument_list, ReportFactory.create() + commit, + commit_yaml, + argument_list, + ReportFactory.create(), + None, ) assert result == mocked_chain.return_value.apply_async.return_value t1 = upload_processor_task.signature( @@ -865,6 +913,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): commitid=commit.commitid, commit_yaml=commit_yaml.to_dict(), report_code=None, + checkpoints=mocker.ANY, ) ) mocked_chain.assert_called_with(t1, t2) diff --git a/tasks/upload.py b/tasks/upload.py index dc3051c92..1d08c72d1 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -20,6 +20,7 @@ from app import celery_app from database.enums import CommitErrorTypes from database.models import Commit +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.archive import ArchiveService @@ -137,6 +138,12 @@ def is_currently_processing(self, redis_connection, repoid, commitid): async def run_async( self, db_session, repoid, commitid, report_code=None, *args, **kwargs ): + # If we're a retry, we already computed the first checkpoint + if "checkpoints" not in kwargs: + checkpoints = CheckpointLogger(UploadFlow) + checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN) + kwargs["checkpoints"] = checkpoints + log.info( "Received upload task", extra=dict(repoid=repoid, commit=commitid, report_code=report_code), @@ -158,7 +165,7 @@ async def run_async( ), ), ) - self.retry(countdown=60) + self.retry(countdown=60, args=args, kwargs=kwargs) try: with redis_connection.lock( lock_name, @@ -208,7 +215,9 @@ async def run_async( commit=commitid, repoid=repoid, countdown=int(retry_countdown) ), ) - self.retry(max_retries=3, countdown=retry_countdown) + self.retry( + max_retries=3, countdown=retry_countdown, args=args, kwargs=kwargs + ) async def run_async_within_lock( self, @@ -235,6 +244,7 @@ async def run_async_within_lock( "was_updated": False, "tasks_were_scheduled": False, } + upload_processing_delay = get_config("setup", "upload_processing_delay") if upload_processing_delay is not None: upload_processing_delay = int(upload_processing_delay) @@ -254,7 +264,17 @@ async def run_async_within_lock( repoid=repoid, commit=commitid, countdown=retry_countdown ), ) - self.retry(countdown=retry_countdown) + self.retry(countdown=retry_countdown, args=args, kwargs=kwargs) + + checkpoints = kwargs.get("checkpoints") + if checkpoints: + checkpoints.log(UploadFlow.PROCESSING_BEGIN) + checkpoints.submit_subflow( + "time_before_processing", + UploadFlow.UPLOAD_TASK_BEGIN, + UploadFlow.PROCESSING_BEGIN, + ) + commit = None commits = db_session.query(Commit).filter( Commit.repoid == repoid, Commit.commitid == commitid @@ -321,7 +341,7 @@ async def run_async_within_lock( "Commit not yet ready to build its initial report. Retrying in 60s.", extra=dict(repoid=commit.repoid, commit=commit.commitid), ) - self.retry(countdown=60) + self.retry(countdown=60, args=args, kwargs=kwargs) argument_list = [] for arguments in self.lists_of_arguments(redis_connection, repoid, commitid): normalized_arguments = self.normalize_upload_arguments( @@ -339,8 +359,16 @@ async def run_async_within_lock( argument_list.append(normalized_arguments) if argument_list: db_session.commit() - self.schedule_task(commit, commit_yaml, argument_list, commit_report) + self.schedule_task( + commit, commit_yaml, argument_list, commit_report, checkpoints + ) else: + checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) + checkpoints.submit_subflow( + "initial_processing_duration", + UploadFlow.PROCESSING_BEGIN, + UploadFlow.INITIAL_PROCESSING_COMPLETE, + ) log.info( "Not scheduling task because there were no arguments were found on redis", extra=dict( @@ -396,7 +424,9 @@ async def fetch_commit_yaml_and_possibly_store(self, commit, repository_service) ownerid=repository.owner.ownerid, ) - def schedule_task(self, commit, commit_yaml, argument_list, commit_report): + def schedule_task( + self, commit, commit_yaml, argument_list, commit_report, checkpoints=None + ): commit_yaml = commit_yaml.to_dict() chain_to_call = [] for i in range(0, len(argument_list), CHUNK_SIZE): @@ -420,10 +450,18 @@ def schedule_task(self, commit, commit_yaml, argument_list, commit_report): commitid=commit.commitid, commit_yaml=commit_yaml, report_code=commit_report.code, + checkpoints=checkpoints, ) ) chain_to_call.append(finish_sig) res = chain(*chain_to_call).apply_async() + if checkpoints: + checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) + checkpoints.submit_subflow( + "initial_processing_duration", + UploadFlow.PROCESSING_BEGIN, + UploadFlow.INITIAL_PROCESSING_COMPLETE, + ) log.info( "Scheduling task for %s different reports", len(argument_list), diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 51006b1eb..b5aa50d0e 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -2,6 +2,7 @@ import re from copy import deepcopy +import sentry_sdk from shared.celery_config import ( compute_comparison_task_name, notify_task_name, @@ -12,6 +13,7 @@ from app import celery_app from database.models import Commit, Pull +from helpers.checkpoint_logger import UploadFlow from services.comparison import get_or_create_comparison from services.redis import get_redis_connection from services.report import ReportService @@ -50,8 +52,17 @@ async def run_async( commitid, commit_yaml, report_code=None, + checkpoints=None, **kwargs, ): + if checkpoints: + checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE) + checkpoints.submit_subflow( + "batch_processing_duration", + UploadFlow.INITIAL_PROCESSING_COMPLETE, + UploadFlow.BATCH_PROCESSING_COMPLETE, + ) + log.info( "Received upload_finisher task", extra=dict( @@ -80,6 +91,7 @@ async def run_async( commit_yaml, processing_results, report_code, + checkpoints, ) save_commit_measurements(commit) self.invalidate_caches(redis_connection, commit) @@ -92,6 +104,7 @@ async def finish_reports_processing( commit_yaml: UserYaml, processing_results, report_code, + checkpoints, ): log.debug("In finish_reports_processing for commit: %s" % commit) commitid = commit.commitid @@ -109,6 +122,7 @@ async def finish_reports_processing( repoid=repoid, commitid=commitid, current_yaml=commit_yaml.to_dict(), + checkpoints=checkpoints, ) ) log.info( @@ -165,6 +179,14 @@ async def finish_reports_processing( ) else: commit.state = "skipped" + + if checkpoints: + checkpoints.log(UploadFlow.PROCESSING_COMPLETE) + checkpoints.submit_subflow( + "total_processing_duration", + UploadFlow.PROCESSING_BEGIN, + UploadFlow.PROCESSING_COMPLETE, + ) return {"notifications_called": notifications_called} def should_call_notifications( From 73f311b0ece384fdd830be4284d57eb35892df8c Mon Sep 17 00:00:00 2001 From: Matt Hammerly Date: Fri, 11 Aug 2023 13:27:23 -0700 Subject: [PATCH 2/2] rework so checkpoint data is serialized between tasks --- helpers/checkpoint_logger.py | 63 +++++++++++--- helpers/tests/unit/test_checkpoint_logger.py | 87 +++++++++++++++++-- tasks/notify.py | 18 ++-- tasks/tests/unit/test_notify_task.py | 12 ++- tasks/tests/unit/test_upload_finisher_task.py | 56 ++++++++---- tasks/tests/unit/test_upload_task.py | 57 ++++++------ tasks/upload.py | 48 +++++----- tasks/upload_finisher.py | 27 +++--- 8 files changed, 256 insertions(+), 112 deletions(-) diff --git a/helpers/checkpoint_logger.py b/helpers/checkpoint_logger.py index 990bcd172..adf619703 100644 --- a/helpers/checkpoint_logger.py +++ b/helpers/checkpoint_logger.py @@ -1,10 +1,14 @@ +import logging import time from enum import Enum, auto import sentry_sdk +from shared.metrics import metrics +logger = logging.getLogger(__name__) -class UploadFlow(Enum): + +class UploadFlow(str, Enum): UPLOAD_TASK_BEGIN = auto() PROCESSING_BEGIN = auto() INITIAL_PROCESSING_COMPLETE = auto() @@ -17,13 +21,42 @@ def _get_milli_timestamp(): return time.time_ns() // 1000000 +def _kwargs_key(cls): + return f"checkpoints_{cls.__name__}" + + +def from_kwargs(cls, kwargs, strict=False): + data = kwargs.get(_kwargs_key(cls), {}) + + # Make sure these checkpoints were made with the same flow + for key in data.keys(): + if key not in iter(cls): + raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`") + + return CheckpointLogger(cls, data, strict) + + class CheckpointLogger: - def __init__(self, cls): + def __init__(self, cls, data=None, strict=False): self.cls = cls - self.data = {} + self.data = data if data else {} + self.kwargs_key = _kwargs_key(self.cls) + self.strict = strict + + def _error(self, msg): + # When a new version of worker rolls out, it will pick up tasks that + # may have been enqueued by the old worker and be missing checkpoints + # data. At least for that reason, we want to allow failing softly. + metrics.incr("worker.checkpoint_logger.error") + if self.strict: + raise ValueError(msg) + else: + logger.error(msg) def _validate_checkpoint(self, checkpoint): if checkpoint.__class__ != self.cls: + # This error is not ignored when `self.strict==False` because it's definitely + # a code mistake raise ValueError( f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`" ) @@ -32,24 +65,34 @@ def _subflow_duration(self, start, end): self._validate_checkpoint(start) self._validate_checkpoint(end) if start not in self.data: - raise ValueError( + return self._error( f"Cannot compute duration; missing start checkpoint {start}" ) elif end not in self.data: - raise ValueError(f"Cannot compute duration; missing end checkpoint {end}") + return self._error(f"Cannot compute duration; missing end checkpoint {end}") elif end.value <= start.value: + # This error is not ignored when `self.strict==False` because it's definitely + # a code mistake raise ValueError( f"Cannot compute duration; end {end} is not after start {start}" ) return self.data[end] - self.data[start] - def log(self, checkpoint): - if checkpoint in self.data: - raise ValueError(f"Already recorded checkpoint {checkpoint}") - self._validate_checkpoint(checkpoint) - self.data[checkpoint] = _get_milli_timestamp() + def log(self, checkpoint, ignore_repeat=False, kwargs=None): + if checkpoint not in self.data: + self._validate_checkpoint(checkpoint) + self.data[checkpoint] = _get_milli_timestamp() + elif not ignore_repeat: + self._error(f"Already recorded checkpoint {checkpoint}") + + if kwargs is not None: + kwargs[self.kwargs_key] = self.data + + return self def submit_subflow(self, metric, start, end): duration = self._subflow_duration(start, end) sentry_sdk.set_measurement(metric, duration, "milliseconds") + + return self diff --git a/helpers/tests/unit/test_checkpoint_logger.py b/helpers/tests/unit/test_checkpoint_logger.py index c2a97a7a4..1e8a1becf 100644 --- a/helpers/tests/unit/test_checkpoint_logger.py +++ b/helpers/tests/unit/test_checkpoint_logger.py @@ -5,7 +5,11 @@ import pytest import sentry_sdk -from helpers.checkpoint_logger import CheckpointLogger, _get_milli_timestamp +from helpers.checkpoint_logger import ( + CheckpointLogger, + _get_milli_timestamp, + from_kwargs, +) class TestEnum1(Enum): @@ -47,15 +51,15 @@ def test_log_multiple_checkpoints(self, mocker): self.assertEqual(checkpoints.data[TestEnum1.B], 9001) self.assertEqual(checkpoints.data[TestEnum1.C], 100000) - def test_log_checkpoint_twice_throws(self): - checkpoints = CheckpointLogger(TestEnum1) + def test_log_checkpoint_twice_ahrows(self): + checkpoints = CheckpointLogger(TestEnum1, strict=True) checkpoints.log(TestEnum1.A) with self.assertRaises(ValueError): checkpoints.log(TestEnum1.A) def test_log_checkpoint_wrong_enum_throws(self): - checkpoints = CheckpointLogger(TestEnum1) + checkpoints = CheckpointLogger(TestEnum1, strict=True) with self.assertRaises(ValueError): checkpoints.log(TestEnum2.A) @@ -71,7 +75,7 @@ def test_subflow_duration(self, mocker): @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) def test_subflow_duration_missing_checkpoints(self, mocker): - checkpoints = CheckpointLogger(TestEnum1) + checkpoints = CheckpointLogger(TestEnum1, strict=True) checkpoints.log(TestEnum1.A) checkpoints.log(TestEnum1.C) @@ -85,7 +89,7 @@ def test_subflow_duration_missing_checkpoints(self, mocker): @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) def test_subflow_duration_wrong_order(self, mocker): - checkpoints = CheckpointLogger(TestEnum1) + checkpoints = CheckpointLogger(TestEnum1, strict=True) checkpoints.log(TestEnum1.A) checkpoints.log(TestEnum1.B) @@ -99,7 +103,7 @@ def test_subflow_duration_wrong_order(self, mocker): @patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337) def test_subflow_duration_wrong_enum(self, mocker): - checkpoints = CheckpointLogger(TestEnum1) + checkpoints = CheckpointLogger(TestEnum1, strict=True) checkpoints.log(TestEnum1.A) # Wrong enum for start checkpoint @@ -121,3 +125,72 @@ def test_submit_subflow(self, mock_sentry, mock_timestamp): expected_duration = 9001 - 1337 checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B) mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds") + + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337]) + def test_log_ignore_repeat(self, mock_timestamp): + checkpoints = CheckpointLogger(TestEnum1, strict=True) + + checkpoints.log(TestEnum1.A) + time = checkpoints.data[TestEnum1.A] + + checkpoints.log(TestEnum1.A, ignore_repeat=True) + assert checkpoints.data[TestEnum1.A] == time + + def test_create_from_kwargs(self): + good_data = { + TestEnum1.A: 1337, + TestEnum1.B: 9001, + } + good_kwargs = { + "checkpoints_TestEnum1": good_data, + } + checkpoints = from_kwargs(TestEnum1, good_kwargs, strict=True) + assert checkpoints.data == good_data + + # Data is from TestEnum2 but we expected TestEnum1 + bad_data = { + TestEnum2.A: 1337, + TestEnum2.B: 9001, + } + bad_kwargs = { + "checkpoints_TestEnum1": bad_data, + } + with self.assertRaises(ValueError): + checkpoints = from_kwargs(TestEnum1, bad_kwargs, strict=True) + + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001]) + def test_log_to_kwargs(self, mock_timestamp): + kwargs = {} + + checkpoints = CheckpointLogger(TestEnum1) + checkpoints.log(TestEnum1.A, kwargs=kwargs) + assert "checkpoints_TestEnum1" in kwargs + assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337 + assert TestEnum1.B not in kwargs["checkpoints_TestEnum1"] + + checkpoints.log(TestEnum1.B, kwargs=kwargs) + assert "checkpoints_TestEnum1" in kwargs + assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337 + assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001 + + pass + + @pytest.mark.real_checkpoint_logger + @patch("sentry_sdk.set_measurement") + @patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[9001]) + def test_create_log_oneliner(self, mock_timestamp, mock_sentry): + kwargs = { + "checkpoints_TestEnum1": { + TestEnum1.A: 1337, + }, + } + + expected_duration = 9001 - 1337 + + from_kwargs(TestEnum1, kwargs, strict=True).log( + TestEnum1.B, kwargs=kwargs + ).submit_subflow("x", TestEnum1.A, TestEnum1.B) + + mock_sentry.assert_called_with("x", expected_duration, "milliseconds") + assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337 + assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001 diff --git a/tasks/notify.py b/tasks/notify.py index 99df5a7c0..319ff44d9 100644 --- a/tasks/notify.py +++ b/tasks/notify.py @@ -16,6 +16,7 @@ from database.enums import CommitErrorTypes, Decoration from database.models import Commit, Pull from helpers.checkpoint_logger import UploadFlow +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.activation import activate_user @@ -50,7 +51,6 @@ async def run_async( commitid: str, current_yaml=None, empty_upload=None, - checkpoints=None, **kwargs, ): redis_connection = get_redis_connection() @@ -81,7 +81,6 @@ async def run_async( commitid=commitid, current_yaml=current_yaml, empty_upload=empty_upload, - checkpoints=checkpoints, **kwargs, ) except LockError as err: @@ -108,7 +107,6 @@ async def run_async_within_lock( commitid: str, current_yaml=None, empty_upload=None, - checkpoints=None, **kwargs, ): log.info("Starting notifications", extra=dict(commit=commitid, repoid=repoid)) @@ -257,13 +255,13 @@ async def run_async_within_lock( enriched_pull, empty_upload, ) - if checkpoints: - checkpoints.log(UploadFlow.NOTIFIED) - checkpoints.submit_subflow( - "notification_latency", - UploadFlow.UPLOAD_TASK_BEGIN, - UploadFlow.NOTIFIED, - ) + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.NOTIFIED + ).submit_subflow( + "notification_latency", + UploadFlow.UPLOAD_TASK_BEGIN, + UploadFlow.NOTIFIED, + ) log.info( "Notifications done", extra=dict( diff --git a/tasks/tests/unit/test_notify_task.py b/tasks/tests/unit/test_notify_task.py index 6421b8d74..b879c95cf 100644 --- a/tasks/tests/unit/test_notify_task.py +++ b/tasks/tests/unit/test_notify_task.py @@ -18,7 +18,7 @@ PullFactory, RepositoryFactory, ) -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key from helpers.exceptions import RepositoryWithoutValidBotError from services.decoration import DecorationDetails from services.notification import NotificationService @@ -420,6 +420,7 @@ async def test_simple_call_yes_notifications_no_base( dbsession.flush() checkpoints = _create_checkpoint_logger(mocker) + kwargs = {_kwargs_key(UploadFlow): checkpoints.data} task = NotifyTask() result = await task.run_async_within_lock( @@ -427,9 +428,8 @@ async def test_simple_call_yes_notifications_no_base( repoid=commit.repoid, commitid=commit.commitid, current_yaml={"coverage": {"status": {"patch": True}}}, - checkpoints=checkpoints, + **kwargs, ) - print(result) expected_result = { "notified": True, "notifications": [ @@ -907,18 +907,22 @@ async def test_run_async_can_run_logic(self, dbsession, mock_redis, mocker): current_yaml = {"codecov": {"require_ci_to_pass": True}} task = NotifyTask() mock_redis.get.return_value = False + checkpoints = _create_checkpoint_logger(mocker) + kwargs = {_kwargs_key(UploadFlow): checkpoints.data} res = await task.run_async( dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, + **kwargs, ) assert res == {"notifications": [], "notified": True, "reason": "yay"} + kwargs = {_kwargs_key(UploadFlow): mocker.ANY} mocked_run_async_within_lock.assert_called_with( dbsession, repoid=commit.repoid, commitid=commit.commitid, current_yaml=current_yaml, empty_upload=None, - checkpoints=mocker.ANY, + **kwargs, ) diff --git a/tasks/tests/unit/test_upload_finisher_task.py b/tasks/tests/unit/test_upload_finisher_task.py index 3f19629ff..a60c3f0f0 100644 --- a/tasks/tests/unit/test_upload_finisher_task.py +++ b/tasks/tests/unit/test_upload_finisher_task.py @@ -12,7 +12,7 @@ RepositoryFactory, UploadFactory, ) -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key from tasks.upload_finisher import ReportService, UploadFinisherTask here = Path(__file__) @@ -72,13 +72,14 @@ async def test_upload_finisher_task_call( } checkpoints = _create_checkpoint_logger(mocker) + kwargs = {_kwargs_key(UploadFlow): checkpoints.data} result = await UploadFinisherTask().run_async( dbsession, previous_results, repoid=commit.repoid, commitid=commit.commitid, commit_yaml={}, - checkpoints=checkpoints, + **kwargs, ) assert commit.notified is False expected_result = {"notifications_called": True} @@ -376,17 +377,24 @@ async def test_finish_reports_processing(self, dbsession, mocker): processing_results = {"processings_so_far": [{"successful": True}]} dbsession.add(commit) dbsession.flush() + + checkpoints = _create_checkpoint_logger(mocker) res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None, None + dbsession, + commit, + UserYaml(commit_yaml), + processing_results, + None, + checkpoints, ) assert res == {"notifications_called": True} mocked_app.tasks["app.tasks.notify.Notify"].apply_async.assert_called_with( - kwargs=dict( - commitid=commit.commitid, - current_yaml=commit_yaml, - repoid=commit.repoid, - checkpoints=ANY, - ) + kwargs={ + "commitid": commit.commitid, + "current_yaml": commit_yaml, + "repoid": commit.repoid, + _kwargs_key(UploadFlow): ANY, + }, ) assert mocked_app.send_task.call_count == 0 @@ -423,17 +431,24 @@ async def test_finish_reports_processing_with_pull(self, dbsession, mocker): dbsession.add(compared_to) dbsession.add(pull) dbsession.flush() + + checkpoints = _create_checkpoint_logger(mocker) res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None, None + dbsession, + commit, + UserYaml(commit_yaml), + processing_results, + None, + checkpoints, ) assert res == {"notifications_called": True} mocked_app.tasks["app.tasks.notify.Notify"].apply_async.assert_called_with( - kwargs=dict( - commitid=commit.commitid, - current_yaml=commit_yaml, - repoid=commit.repoid, - checkpoints=ANY, - ) + kwargs={ + "commitid": commit.commitid, + "current_yaml": commit_yaml, + "repoid": commit.repoid, + _kwargs_key(UploadFlow): ANY, + }, ) mocked_app.tasks["app.tasks.pulls.Sync"].apply_async.assert_called_with( kwargs={ @@ -462,8 +477,15 @@ async def test_finish_reports_processing_no_notification(self, dbsession, mocker processing_results = {"processings_so_far": [{"successful": False}]} dbsession.add(commit) dbsession.flush() + + checkpoints = _create_checkpoint_logger(mocker) res = await UploadFinisherTask().finish_reports_processing( - dbsession, commit, UserYaml(commit_yaml), processing_results, None, None + dbsession, + commit, + UserYaml(commit_yaml), + processing_results, + None, + checkpoints, ) assert res == {"notifications_called": False} assert mocked_app.send_task.call_count == 0 diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 544c39695..789a8604d 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -16,7 +16,7 @@ from database.models.reports import CommitReport from database.tests.factories import CommitFactory, OwnerFactory, RepositoryFactory from database.tests.factories.core import ReportFactory -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow +from helpers.checkpoint_logger import CheckpointLogger, UploadFlow, _kwargs_key from helpers.exceptions import RepositoryWithoutValidBotError from services.archive import ArchiveService from services.report import NotReadyToBuildReportYetError, ReportService @@ -125,8 +125,12 @@ async def test_upload_task_call( f"uploads/{commit.repoid}/{commit.commitid}" ] = jsonified_redis_queue checkpoints = _create_checkpoint_logger(mocker) + kwargs = {_kwargs_key(UploadFlow): checkpoints.data} result = await UploadTask().run_async( - dbsession, commit.repoid, commit.commitid, checkpoints=checkpoints + dbsession, + commit.repoid, + commit.commitid, + kwargs=kwargs, ) expected_result = {"was_setup": False, "was_updated": True} assert expected_result == result @@ -157,22 +161,16 @@ async def test_upload_task_call( report_code=None, ), ) - t2 = upload_finisher_task.signature( - kwargs=dict( - repoid=commit.repoid, - commitid="abf6d4df662c47e32460020ab14abf9303581429", - commit_yaml={"codecov": {"max_report_age": "1y ago"}}, - report_code=None, - checkpoints=mocker.ANY, - ) + kwargs = dict( + repoid=commit.repoid, + commitid="abf6d4df662c47e32460020ab14abf9303581429", + commit_yaml={"codecov": {"max_report_age": "1y ago"}}, + report_code=None, ) + kwargs[_kwargs_key(UploadFlow)] = mocker.ANY + t2 = upload_finisher_task.signature(kwargs=kwargs) mocked_1.assert_called_with(t1, t2) - assert checkpoints.data == { - UploadFlow.UPLOAD_TASK_BEGIN: 1337, - UploadFlow.PROCESSING_BEGIN: 9001, - UploadFlow.INITIAL_PROCESSING_COMPLETE: 10000, - } calls = [ mock.call( "time_before_processing", @@ -448,15 +446,14 @@ async def test_upload_task_call_multiple_processors( report_code=None, ), ) - t_final = upload_finisher_task.signature( - kwargs=dict( - repoid=commit.repoid, - commitid="abf6d4df662c47e32460020ab14abf9303581429", - commit_yaml={"codecov": {"max_report_age": "1y ago"}}, - report_code=None, - checkpoints=mocker.ANY, - ) + kwargs = dict( + repoid=commit.repoid, + commitid="abf6d4df662c47e32460020ab14abf9303581429", + commit_yaml={"codecov": {"max_report_age": "1y ago"}}, + report_code=None, ) + kwargs[_kwargs_key(UploadFlow)] = mocker.ANY + t_final = upload_finisher_task.signature(kwargs=kwargs) mocked_1.assert_called_with(t1, t2, t3, t_final) mock_redis.lock.assert_any_call( f"upload_lock_{commit.repoid}_{commit.commitid}", @@ -908,13 +905,13 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): ), ) t2 = upload_finisher_task.signature( - kwargs=dict( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml.to_dict(), - report_code=None, - checkpoints=mocker.ANY, - ) + kwargs={ + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml.to_dict(), + "report_code": None, + _kwargs_key(UploadFlow): mocker.ANY, + } ) mocked_chain.assert_called_with(t1, t2) diff --git a/tasks/upload.py b/tasks/upload.py index 1d08c72d1..89cb810f9 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -20,7 +20,8 @@ from app import celery_app from database.enums import CommitErrorTypes from database.models import Commit -from helpers.checkpoint_logger import CheckpointLogger, UploadFlow +from helpers.checkpoint_logger import UploadFlow, _kwargs_key +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.exceptions import RepositoryWithoutValidBotError from helpers.save_commit_error import save_commit_error from services.archive import ArchiveService @@ -138,11 +139,11 @@ def is_currently_processing(self, redis_connection, repoid, commitid): async def run_async( self, db_session, repoid, commitid, report_code=None, *args, **kwargs ): - # If we're a retry, we already computed the first checkpoint - if "checkpoints" not in kwargs: - checkpoints = CheckpointLogger(UploadFlow) - checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN) - kwargs["checkpoints"] = checkpoints + # If we're a retry, kwargs will already have our first checkpoint. + # If not, log it directly into kwargs so we can pass it onto other tasks + checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True + ) log.info( "Received upload task", @@ -266,14 +267,15 @@ async def run_async_within_lock( ) self.retry(countdown=retry_countdown, args=args, kwargs=kwargs) - checkpoints = kwargs.get("checkpoints") - if checkpoints: - checkpoints.log(UploadFlow.PROCESSING_BEGIN) - checkpoints.submit_subflow( + try: + checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) + checkpoints.log(UploadFlow.PROCESSING_BEGIN).submit_subflow( "time_before_processing", UploadFlow.UPLOAD_TASK_BEGIN, UploadFlow.PROCESSING_BEGIN, ) + except ValueError as e: + log.warning(f"CheckpointLogger failed to log/submit", extra=dict(error=e)) commit = None commits = db_session.query(Commit).filter( @@ -444,17 +446,7 @@ def schedule_task( ) chain_to_call.append(sig) if chain_to_call: - finish_sig = upload_finisher_task.signature( - kwargs=dict( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - report_code=commit_report.code, - checkpoints=checkpoints, - ) - ) - chain_to_call.append(finish_sig) - res = chain(*chain_to_call).apply_async() + checkpoint_data = None if checkpoints: checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) checkpoints.submit_subflow( @@ -462,6 +454,20 @@ def schedule_task( UploadFlow.PROCESSING_BEGIN, UploadFlow.INITIAL_PROCESSING_COMPLETE, ) + checkpoint_data = checkpoints.data + + finish_sig = upload_finisher_task.signature( + kwargs={ + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + "report_code": commit_report.code, + _kwargs_key(UploadFlow): checkpoint_data, + }, + ) + chain_to_call.append(finish_sig) + res = chain(*chain_to_call).apply_async() + log.info( "Scheduling task for %s different reports", len(argument_list), diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index b5aa50d0e..ef350e7b6 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -13,7 +13,8 @@ from app import celery_app from database.models import Commit, Pull -from helpers.checkpoint_logger import UploadFlow +from helpers.checkpoint_logger import UploadFlow, _kwargs_key +from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from services.comparison import get_or_create_comparison from services.redis import get_redis_connection from services.report import ReportService @@ -52,16 +53,17 @@ async def run_async( commitid, commit_yaml, report_code=None, - checkpoints=None, **kwargs, ): - if checkpoints: - checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE) - checkpoints.submit_subflow( + try: + checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) + checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE).submit_subflow( "batch_processing_duration", UploadFlow.INITIAL_PROCESSING_COMPLETE, UploadFlow.BATCH_PROCESSING_COMPLETE, ) + except ValueError as e: + log.warning(f"CheckpointLogger failed to log/submit", extra=dict(error=e)) log.info( "Received upload_finisher task", @@ -118,12 +120,12 @@ async def finish_reports_processing( ): notifications_called = True task = self.app.tasks[notify_task_name].apply_async( - kwargs=dict( - repoid=repoid, - commitid=commitid, - current_yaml=commit_yaml.to_dict(), - checkpoints=checkpoints, - ) + kwargs={ + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + _kwargs_key(UploadFlow): checkpoints.data, + }, ) log.info( "Scheduling notify task", @@ -181,8 +183,7 @@ async def finish_reports_processing( commit.state = "skipped" if checkpoints: - checkpoints.log(UploadFlow.PROCESSING_COMPLETE) - checkpoints.submit_subflow( + checkpoints.log(UploadFlow.PROCESSING_COMPLETE).submit_subflow( "total_processing_duration", UploadFlow.PROCESSING_BEGIN, UploadFlow.PROCESSING_COMPLETE,