Skip to content

Commit

Permalink
feat: add TestResultsFlow for measuring time to notification
Browse files Browse the repository at this point in the history
Signed-off-by: joseph-sentry <[email protected]>
  • Loading branch information
joseph-sentry committed May 8, 2024
1 parent 9a02734 commit 3d33289
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 14 deletions.
14 changes: 14 additions & 0 deletions helpers/checkpoint_logger/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,17 @@ class UploadFlow(BaseFlow):
NOTIF_TOO_MANY_RETRIES = auto()
NOTIF_STALE_HEAD = auto()
NOTIF_ERROR_NO_REPORT = auto()


@failure_events("TEST_RESULTS_ERROR")
@success_events("TEST_RESULTS_BEGIN")
@subflows(
("notification_latency", "TEST_RESULTS_BEGIN", "TEST_RESULTS_NOTIFY"),
("notification_latency", "TEST_RESULTS_BEGIN", "FLAKE_DETECTION_NOTIFY"),
)
@reliability_counters
class TestResultsFlow(BaseFlow):
TEST_RESULTS_BEGIN = auto()
TEST_RESULTS_NOTIFY = auto()
FLAKE_DETECTION_NOTIFY = auto()
TEST_RESULTS_ERROR = auto()
11 changes: 10 additions & 1 deletion tasks/test_results_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from app import celery_app
from database.enums import FlakeSymptomType, ReportType, TestResultsProcessingError
from database.models import Commit, TestResultReportTotals
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow
from helpers.string import EscapeEnum, Replacement, StringEscaper, shorten_file_paths
from rollouts import FLAKY_TEST_DETECTION
from services.failure_normalizer import FailureNormalizer
Expand Down Expand Up @@ -67,6 +69,7 @@ def run_impl(
commit_yaml: dict,
**kwargs,
):

repoid = int(repoid)
commit_yaml = UserYaml.from_dict(commit_yaml)

Expand Down Expand Up @@ -136,6 +139,8 @@ def process_impl_within_lock(
),
)

checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs)

commit: Commit = (
db_session.query(Commit).filter_by(repoid=repoid, commitid=commitid).first()
)
Expand Down Expand Up @@ -271,6 +276,7 @@ def process_impl_within_lock(
)

with metrics.timing("test_results.finisher.notification"):
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)

log.info(
Expand Down Expand Up @@ -302,7 +308,7 @@ def process_impl_within_lock(
)
with metrics.timing("test_results.finisher.run_flaky_test_detection"):
success, reason = self.run_flaky_test_detection(
db_session, repoid, notifier, payload
db_session, repoid, notifier, payload, checkpoints=checkpoints
)

metrics.incr(
Expand All @@ -322,6 +328,7 @@ def run_flaky_test_detection(
repoid,
notifier: TestResultsNotifier,
payload: TestResultsNotificationPayload,
checkpoints=None,
):
ignore_predefined = read_yaml_field(
"test_analytics", "ignore_predefined", _else=False
Expand Down Expand Up @@ -376,6 +383,8 @@ def run_flaky_test_detection(
)
db_session.flush()

if checkpoints:
checkpoints.log(TestResultsFlow.TEST_RESULTS_NOTIFY)
success, reason = async_to_sync(notifier.notify)(payload)
log.info(
"Added flaky test information to the PR comment",
Expand Down
1 change: 1 addition & 0 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def test_upload_task_call_test_results(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={"codecov": {"max_report_age": "1y ago"}},
checkpoints_TestResultsFlow=None,
)
)

Expand Down
30 changes: 17 additions & 13 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.metrics import metrics
Expand Down Expand Up @@ -258,6 +258,10 @@ def run_impl(
checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log(
UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True
)
elif report_type == ReportType.TEST_RESULTS.value:
checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log(
TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True
)

repoid = int(repoid)
log.info(
Expand Down Expand Up @@ -628,7 +632,7 @@ def schedule_task(
)
elif commit_report.report_type == ReportType.TEST_RESULTS.value:
res = self._schedule_test_results_processing_task(
commit, commit_yaml, argument_list, commit_report
commit, commit_yaml, argument_list, commit_report, checkpoints
)

if res:
Expand Down Expand Up @@ -850,11 +854,7 @@ def _schedule_bundle_analysis_processing_task(
return res

def _schedule_test_results_processing_task(
self,
commit,
commit_yaml,
argument_list,
commit_report,
self, commit, commit_yaml, argument_list, commit_report, checkpoints=None
):
processor_task_group = []
for i in range(0, len(argument_list), CHUNK_SIZE):
Expand All @@ -872,16 +872,20 @@ def _schedule_test_results_processing_task(
)
processor_task_group.append(sig)
if processor_task_group:

checkpoint_data = None
if checkpoints:
checkpoint_data = checkpoints.data
kwargs = {
"repoid": commit.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml,
_kwargs_key(TestResultsFlow): checkpoint_data,
}
res = chord(
processor_task_group,
test_results_finisher_task.signature(
args=(),
kwargs=dict(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml=commit_yaml,
),
kwargs=kwargs,
),
).apply_async()

Expand Down

0 comments on commit 3d33289

Please sign in to comment.