Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add "flow logger" + instrument upload flow with it #47

Merged
merged 4 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,27 @@ def with_sql_functions(dbsession):
$$ language sql stable strict;"""
)
dbsession.flush()


# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return

def mock_submit_fn(metric, start, end):
pass

mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn

import helpers

return mocker.patch.object(
helpers.checkpoint_logger.CheckpointLogger,
"submit_subflow",
mock_submit,
)

return mock_submit
55 changes: 55 additions & 0 deletions helpers/checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import time
from enum import Enum, auto

import sentry_sdk


class UploadFlow(Enum):
UPLOAD_TASK_BEGIN = auto()
PROCESSING_BEGIN = auto()
INITIAL_PROCESSING_COMPLETE = auto()
BATCH_PROCESSING_COMPLETE = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()


def _get_milli_timestamp():
return time.time_ns() // 1000000


class CheckpointLogger:
def __init__(self, cls):
self.cls = cls
self.data = {}

def _validate_checkpoint(self, checkpoint):
if checkpoint.__class__ != self.cls:
raise ValueError(
f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`"
)

def _subflow_duration(self, start, end):
self._validate_checkpoint(start)
self._validate_checkpoint(end)
if start not in self.data:
raise ValueError(
f"Cannot compute duration; missing start checkpoint {start}"
)
elif end not in self.data:
raise ValueError(f"Cannot compute duration; missing end checkpoint {end}")
elif end.value <= start.value:
raise ValueError(
f"Cannot compute duration; end {end} is not after start {start}"
)

return self.data[end] - self.data[start]

def log(self, checkpoint):
if checkpoint in self.data:
raise ValueError(f"Already recorded checkpoint {checkpoint}")
self._validate_checkpoint(checkpoint)
self.data[checkpoint] = _get_milli_timestamp()

def submit_subflow(self, metric, start, end):
duration = self._subflow_duration(start, end)
sentry_sdk.set_measurement(metric, duration, "milliseconds")
123 changes: 123 additions & 0 deletions helpers/tests/unit/test_checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import unittest
from enum import Enum, auto
from unittest.mock import ANY, patch

import pytest
import sentry_sdk

from helpers.checkpoint_logger import CheckpointLogger, _get_milli_timestamp


class TestEnum1(Enum):
A = auto()
B = auto()
C = auto()


class TestEnum2(Enum):
A = auto()
B = auto()
C = auto()


class TestCheckpointLogger(unittest.TestCase):
@patch("time.time_ns", return_value=123456789)
def test_get_milli_timestamp(self, mocker):
expected_ms = 123456789 // 1000000
self.assertEqual(_get_milli_timestamp(), expected_ms)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_log_checkpoint(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)

@patch(
"helpers.checkpoint_logger._get_milli_timestamp",
side_effect=[1337, 9001, 100000],
)
def test_log_multiple_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)
checkpoints.log(TestEnum1.C)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)
self.assertEqual(checkpoints.data[TestEnum1.B], 9001)
self.assertEqual(checkpoints.data[TestEnum1.C], 100000)

def test_log_checkpoint_twice_throws(self):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum1.A)

def test_log_checkpoint_wrong_enum_throws(self):
checkpoints = CheckpointLogger(TestEnum1)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum2.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

duration = checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)
self.assertEqual(duration, 9001 - 1337)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_missing_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.C)

# Missing end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)

# Missing start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.C)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_wrong_order(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

# End < start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.A)

# End == start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_subflow_duration_wrong_enum(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)

# Wrong enum for start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum2.A, TestEnum1.A)

# Wrong enum for end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum2.A)

@pytest.mark.real_checkpoint_logger
@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
@patch("sentry_sdk.set_measurement")
def test_submit_subflow(self, mock_sentry, mock_timestamp):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

expected_duration = 9001 - 1337
checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B)
mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds")
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[pytest]
addopts = --sqlalchemy-connect-url="postgresql://postgres@postgres:5432/background_test"
markers=
integration: integration tests (includes tests with vcrs)
integration: integration tests (includes tests with vcrs)
real_checkpoint_logger: prevents use of stubbed CheckpointLogger
12 changes: 11 additions & 1 deletion tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from app import celery_app
from database.enums import CommitErrorTypes, Decoration
from database.models import Commit, Pull
from helpers.checkpoint_logger import UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.save_commit_error import save_commit_error
from services.activation import activate_user
Expand All @@ -37,7 +38,6 @@


class NotifyTask(BaseCodecovTask):

name = notify_task_name

throws = (SoftTimeLimitExceeded,)
Expand All @@ -50,6 +50,7 @@ async def run_async(
commitid: str,
current_yaml=None,
empty_upload=None,
checkpoints=None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be an instance of CheckpointLogger? I believe all these task arguments need to be serializable in some way (not sure if JSON is used or something else) - so just wanted to make sure this was tested in some sort of E2E fashion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah great callout, i think the tests don't actually exercise passing this between tasks

i was at least planning to create a sentry project and try to get a dashboard populated end-to-end before merging, but i'll also see if i can write a proper integration test that passes this between tasks in CI

**kwargs,
):
redis_connection = get_redis_connection()
Expand Down Expand Up @@ -80,6 +81,7 @@ async def run_async(
commitid=commitid,
current_yaml=current_yaml,
empty_upload=empty_upload,
checkpoints=checkpoints,
**kwargs,
)
except LockError as err:
Expand All @@ -106,6 +108,7 @@ async def run_async_within_lock(
commitid: str,
current_yaml=None,
empty_upload=None,
checkpoints=None,
**kwargs,
):
log.info("Starting notifications", extra=dict(commit=commitid, repoid=repoid))
Expand Down Expand Up @@ -254,6 +257,13 @@ async def run_async_within_lock(
enriched_pull,
empty_upload,
)
if checkpoints:
checkpoints.log(UploadFlow.NOTIFIED)
checkpoints.submit_subflow(
"notification_latency",
UploadFlow.UPLOAD_TASK_BEGIN,
UploadFlow.NOTIFIED,
)
log.info(
"Notifications done",
extra=dict(
Expand Down
46 changes: 45 additions & 1 deletion tasks/tests/unit/test_notify_task.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import call

import pytest
from celery.exceptions import MaxRetriesExceededError, Retry
from redis.exceptions import LockError
Expand All @@ -16,6 +18,7 @@
PullFactory,
RepositoryFactory,
)
from helpers.checkpoint_logger import CheckpointLogger, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from services.decoration import DecorationDetails
from services.notification import NotificationService
Expand All @@ -28,6 +31,20 @@
from tasks.notify import NotifyTask


def _create_checkpoint_logger(mocker):
mocker.patch(
"helpers.checkpoint_logger._get_milli_timestamp",
side_effect=[1337, 9001, 10000, 15000, 20000, 25000],
)
checkpoints = CheckpointLogger(UploadFlow)
checkpoints.log(UploadFlow.UPLOAD_TASK_BEGIN)
checkpoints.log(UploadFlow.PROCESSING_BEGIN)
checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE)
checkpoints.log(UploadFlow.BATCH_PROCESSING_COMPLETE)
checkpoints.log(UploadFlow.PROCESSING_COMPLETE)
return checkpoints


@pytest.fixture
def enriched_pull(dbsession):
repository = RepositoryFactory.create(
Expand Down Expand Up @@ -356,7 +373,12 @@ async def test_simple_call_no_notifications_commit_differs_from_pulls_head(

@pytest.mark.asyncio
async def test_simple_call_yes_notifications_no_base(
self, dbsession, mocker, mock_storage, mock_configuration
self,
dbsession,
mocker,
mock_storage,
mock_configuration,
mock_checkpoint_submit,
):
fake_notifier = mocker.MagicMock(
AbstractBaseNotifier,
Expand Down Expand Up @@ -396,12 +418,16 @@ async def test_simple_call_yes_notifications_no_base(
commit = CommitFactory.create(message="", pullid=None)
dbsession.add(commit)
dbsession.flush()

checkpoints = _create_checkpoint_logger(mocker)

task = NotifyTask()
result = await task.run_async_within_lock(
dbsession,
repoid=commit.repoid,
commitid=commit.commitid,
current_yaml={"coverage": {"status": {"patch": True}}},
checkpoints=checkpoints,
)
print(result)
expected_result = {
Expand All @@ -428,6 +454,23 @@ async def test_simple_call_yes_notifications_no_base(
dbsession.refresh(commit)
assert commit.notified is True

assert checkpoints.data == {
UploadFlow.UPLOAD_TASK_BEGIN: 1337,
UploadFlow.PROCESSING_BEGIN: 9001,
UploadFlow.INITIAL_PROCESSING_COMPLETE: 10000,
UploadFlow.BATCH_PROCESSING_COMPLETE: 15000,
UploadFlow.PROCESSING_COMPLETE: 20000,
UploadFlow.NOTIFIED: 25000,
}
calls = [
call(
"notification_latency",
UploadFlow.UPLOAD_TASK_BEGIN,
UploadFlow.NOTIFIED,
),
]
mock_checkpoint_submit.assert_has_calls(calls)

@pytest.mark.asyncio
async def test_simple_call_no_pullrequest_found(
self, dbsession, mocker, mock_storage, mock_configuration
Expand Down Expand Up @@ -877,4 +920,5 @@ async def test_run_async_can_run_logic(self, dbsession, mock_redis, mocker):
commitid=commit.commitid,
current_yaml=current_yaml,
empty_upload=None,
checkpoints=mocker.ANY,
)
Loading
Loading