Skip to content

Commit

Permalink
feat: add TestResultsFlow for measuring time to notification
Browse files Browse the repository at this point in the history
Signed-off-by: joseph-sentry <[email protected]>
  • Loading branch information
joseph-sentry committed May 8, 2024
1 parent f2ce06f commit 7638bed
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 40 deletions.
14 changes: 14 additions & 0 deletions helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ class UploadFlow(BaseFlow):
NOTIF_TOO_MANY_RETRIES = auto()
NOTIF_STALE_HEAD = auto()
NOTIF_ERROR_NO_REPORT = auto()


@failure_events("TEST_RESULTS_ERROR")
@success_events("TEST_RESULTS_BEGIN")
@subflows(
("notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"),
("notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"),
)
@reliability_counters
class TestResultsFlow(BaseFlow):
TEST_RESULTS_BEGIN = auto()
TEST_RESULTS_NOTIFY = auto()
FLAKE_DETECTION_NOTIFY = auto()
TEST_RESULTS_ERROR = auto()
11 changes: 10 additions & 1 deletion tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from app import celery_app
from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError
from database.models import Commit, TestResultReportTotals
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow
from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths
from rollouts import FLAKY_TEST_DETECTION
from services.failure_normalizer import FailureNormalizer
Expand Down Expand Up @@ -66,6 +68,7 @@ def run_impl(
commit_yaml: dict,
**kwargs,
):

repoid = int(repoid)
commit_yaml = UserYaml.from_dict(commit_yaml)

Expand Down Expand Up @@ -135,6 +138,8 @@ def process_impl_within_lock(
),
)

checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs)

commit: Commit = (
db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first()
)
Expand Down Expand Up @@ -270,6 +275,7 @@ def process_impl_within_lock(
)

with metrics.timing("test_results.finisher.notification"):
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)

log.info(
Expand Down Expand Up @@ -301,7 +307,7 @@ def process_impl_within_lock(
)
with metrics.timing("test_results.finisher.run_flaky_test_detection"):
success, reason = self.run_flaky_test_detection(
db_session, repoid, notifier, payload
db_session, repoid, notifier, payload, checkpoints=checkpoints
)

metrics.incr(
Expand All @@ -321,6 +327,7 @@ def run_flaky_test_detection(
repoid,
notifier: TestResultsNotifier,
payload: TestResultsNotificationPayload,
checkpoints=None,
):
ignore_predefined = read_yaml_field(
"test_analytics", "ignore_predefined", _else=False
Expand Down Expand Up @@ -375,6 +382,8 @@ def run_flaky_test_detection(
)
db_session.flush()

if checkpoints:
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)
log.info(
"Added flaky test information to the PR comment",
Expand Down
55 changes: 28 additions & 27 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def test_upload_task_call(
dbsession.flush()
dbsession.refresh(commit)
repo_updatestamp = commit.repository.updatestamp
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
checkpoints = _create_checkpoint_logger(mocker)
kwargs = {_kwargs_key(UploadFlow): checkpoints.data}
result = UploadTask().run_impl(
Expand Down Expand Up @@ -299,9 +299,9 @@ def test_upload_task_call_test_results(
dbsession.flush()
dbsession.refresh(commit)

mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}/test_results"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}/test_results"
] = jsonified_redis_queue

UploadTask().run_impl(
dbsession,
Expand Down Expand Up @@ -335,6 +335,7 @@ def test_upload_task_call_test_results(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={"codecov": {"max_report_age": "1y ago"}},
checkpoints_TestResultsFlow=None,
)
)

Expand Down Expand Up @@ -408,9 +409,9 @@ def test_upload_task_upload_processing_delay_not_enough_delay(
)
dbsession.add(commit)
dbsession.flush()
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
mock_redis.keys[f"latest_upload/{commit.repoid}/{commit.commitid}"] = (
datetime.utcnow() - timedelta(seconds=10)
).timestamp()
Expand Down Expand Up @@ -548,9 +549,9 @@ def test_upload_task_call_multiple_processors(
)
dbsession.add(commit)
dbsession.flush()
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid)
expected_result = {"was_setup": False, "was_updated": True}
assert expected_result == result
Expand Down Expand Up @@ -663,9 +664,9 @@ def test_upload_task_proper_parent(
dbsession.flush()
redis_queue = [{"build": "part1"}]
jsonified_redis_queue = [json.dumps(x) for x in redis_queue]
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid)
expected_result = {"was_setup": False, "was_updated": True}
assert expected_result == result
Expand Down Expand Up @@ -710,9 +711,9 @@ def test_upload_task_no_bot(
)
dbsession.add(commit)
dbsession.flush()
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid)
expected_result = {"was_setup": False, "was_updated": False}
assert expected_result == result
Expand Down Expand Up @@ -765,9 +766,9 @@ def test_upload_task_bot_no_permissions(
)
dbsession.add(commit)
dbsession.flush()
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
result = UploadTask().run_impl(dbsession, commit.repoid, commit.commitid)
expected_result = {"was_setup": False, "was_updated": False}
assert expected_result == result
Expand Down Expand Up @@ -820,9 +821,9 @@ def test_upload_task_bot_unauthorized(
mock_repo_provider.data = dict(repo=dict(repoid=commit.repoid))
dbsession.add(commit)
dbsession.flush()
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
upload_args = UploadContext(
repoid=commit.repoid,
commitid=commit.commitid,
Expand Down Expand Up @@ -920,9 +921,9 @@ def fail_if_try_to_create_upload(*args, **kwargs):
{"build": "part1", "url": "url1", "upload_id": upload.id_},
]
jsonified_redis_queue = [json.dumps(x) for x in redis_queue]
mock_redis.lists[f"uploads/{commit.repoid}/{commit.commitid}"] = (
jsonified_redis_queue
)
mock_redis.lists[
f"uploads/{commit.repoid}/{commit.commitid}"
] = jsonified_redis_queue
upload_args = UploadContext(
repoid=commit.repoid,
commitid=commit.commitid,
Expand Down
29 changes: 17 additions & 12 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.parallel_upload_processing import get_parallel_session_ids
Expand Down Expand Up @@ -259,6 +259,10 @@ def run_impl(
checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log(
UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True
)
elif report_type == ReportType.TEST_RESULTS.value:
checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log(
TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True
)

repoid = int(repoid)
log.info(
Expand Down Expand Up @@ -629,7 +633,7 @@ def schedule_task(
)
elif commit_report.report_type == ReportType.TEST_RESULTS.value:
res = self._schedule_test_results_processing_task(
commit, commit_yaml, argument_list, commit_report
commit, commit_yaml, argument_list, commit_report, checkpoints
)

if res:
Expand Down Expand Up @@ -851,11 +855,7 @@ def _schedule_bundle_analysis_processing_task(
return res

def _schedule_test_results_processing_task(
self,
commit,
commit_yaml,
argument_list,
commit_report,
self, commit, commit_yaml, argument_list, commit_report, checkpoints=None
):
processor_task_group = []
for i in range(0, len(argument_list), CHUNK_SIZE):
Expand All @@ -873,15 +873,20 @@ def _schedule_test_results_processing_task(
)
processor_task_group.append(sig)
if processor_task_group:
checkpoint_data = None
if checkpoints:
checkpoint_data = checkpoints.data

Check warning on line 878 in tasks/upload.py

View check run for this annotation

Codecov Public QA / codecov/patch

tasks/upload.py#L878

Added line #L878 was not covered by tests

Check warning on line 878 in tasks/upload.py

View check run for this annotation

Codecov Notifications / codecov/patch

tasks/upload.py#L878

Added line #L878 was not covered by tests

Check warning on line 878 in tasks/upload.py

View check run for this annotation

Codecov - QA / codecov/patch

tasks/upload.py#L878

Added line #L878 was not covered by tests

Check warning on line 878 in tasks/upload.py

View check run for this annotation

Codecov / codecov/patch

tasks/upload.py#L878

Added line #L878 was not covered by tests
kwargs = {
"repoid": commit.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml,
_kwargs_key(TestResultsFlow): checkpoint_data,
}
res = chord(
processor_task_group,
test_results_finisher_task.signature(
args=(),
kwargs=dict(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml=commit_yaml,
),
kwargs=kwargs,
),
).apply_async()

Expand Down

0 comments on commit 7638bed

Please sign in to comment.