Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add "flow logger" + instrument upload flow with it #47

Merged
merged 4 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,27 @@ def with_sql_functions(dbsession):
$$ language sql stable strict;"""
)
dbsession.flush()


# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return

def mock_submit_fn(metric, start, end):
pass

mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn

import helpers

return mocker.patch.object(
helpers.checkpoint_logger.CheckpointLogger,
"submit_subflow",
mock_submit,
)

return mock_submit
98 changes: 98 additions & 0 deletions helpers/checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
import time
from enum import Enum, auto

import sentry_sdk
from shared.metrics import metrics

logger = logging.getLogger(__name__)


class UploadFlow(str, Enum):
UPLOAD_TASK_BEGIN = auto()
PROCESSING_BEGIN = auto()
INITIAL_PROCESSING_COMPLETE = auto()
BATCH_PROCESSING_COMPLETE = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()


def _get_milli_timestamp():
return time.time_ns() // 1000000


def _kwargs_key(cls):
return f"checkpoints_{cls.__name__}"


def from_kwargs(cls, kwargs, strict=False):
data = kwargs.get(_kwargs_key(cls), {})

# Make sure these checkpoints were made with the same flow
for key in data.keys():
if key not in iter(cls):
raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`")

return CheckpointLogger(cls, data, strict)


class CheckpointLogger:
def __init__(self, cls, data=None, strict=False):
self.cls = cls
self.data = data if data else {}
self.kwargs_key = _kwargs_key(self.cls)
self.strict = strict

def _error(self, msg):
# When a new version of worker rolls out, it will pick up tasks that
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
if self.strict:
raise ValueError(msg)
else:
logger.error(msg)

Check warning on line 54 in helpers/checkpoint_logger.py

View check run for this annotation

Codecov / codecov/patch

helpers/checkpoint_logger.py#L54

Added line #L54 was not covered by tests

def _validate_checkpoint(self, checkpoint):
if checkpoint.__class__ != self.cls:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`"
)

def _subflow_duration(self, start, end):
self._validate_checkpoint(start)
self._validate_checkpoint(end)
if start not in self.data:
return self._error(
f"Cannot compute duration; missing start checkpoint {start}"
)
elif end not in self.data:
return self._error(f"Cannot compute duration; missing end checkpoint {end}")
elif end.value <= start.value:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Cannot compute duration; end {end} is not after start {start}"
)

return self.data[end] - self.data[start]

def log(self, checkpoint, ignore_repeat=False, kwargs=None):
if checkpoint not in self.data:
self._validate_checkpoint(checkpoint)
self.data[checkpoint] = _get_milli_timestamp()
elif not ignore_repeat:
self._error(f"Already recorded checkpoint {checkpoint}")

if kwargs is not None:
kwargs[self.kwargs_key] = self.data

return self

def submit_subflow(self, metric, start, end):
duration = self._subflow_duration(start, end)
sentry_sdk.set_measurement(metric, duration, "milliseconds")

return self
196 changes: 196 additions & 0 deletions helpers/tests/unit/test_checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import unittest
from enum import Enum, auto
from unittest.mock import ANY, patch

import pytest
import sentry_sdk

from helpers.checkpoint_logger import (
CheckpointLogger,
_get_milli_timestamp,
from_kwargs,
)


class TestEnum1(Enum):
A = auto()
B = auto()
C = auto()


class TestEnum2(Enum):
A = auto()
B = auto()
C = auto()


class TestCheckpointLogger(unittest.TestCase):
@patch("time.time_ns", return_value=123456789)
def test_get_milli_timestamp(self, mocker):
expected_ms = 123456789 // 1000000
self.assertEqual(_get_milli_timestamp(), expected_ms)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_log_checkpoint(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)

@patch(
"helpers.checkpoint_logger._get_milli_timestamp",
side_effect=[1337, 9001, 100000],
)
def test_log_multiple_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)
checkpoints.log(TestEnum1.C)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)
self.assertEqual(checkpoints.data[TestEnum1.B], 9001)
self.assertEqual(checkpoints.data[TestEnum1.C], 100000)

def test_log_checkpoint_twice_ahrows(self):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum1.A)

def test_log_checkpoint_wrong_enum_throws(self):
checkpoints = CheckpointLogger(TestEnum1, strict=True)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum2.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

duration = checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)
self.assertEqual(duration, 9001 - 1337)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_missing_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.C)

# Missing end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)

# Missing start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.C)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_wrong_order(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

# End < start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.A)

# End == start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_subflow_duration_wrong_enum(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)

# Wrong enum for start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum2.A, TestEnum1.A)

# Wrong enum for end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum2.A)

@pytest.mark.real_checkpoint_logger
@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
@patch("sentry_sdk.set_measurement")
def test_submit_subflow(self, mock_sentry, mock_timestamp):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

expected_duration = 9001 - 1337
checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B)
mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds")

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337])
def test_log_ignore_repeat(self, mock_timestamp):
checkpoints = CheckpointLogger(TestEnum1, strict=True)

checkpoints.log(TestEnum1.A)
time = checkpoints.data[TestEnum1.A]

checkpoints.log(TestEnum1.A, ignore_repeat=True)
assert checkpoints.data[TestEnum1.A] == time

def test_create_from_kwargs(self):
good_data = {
TestEnum1.A: 1337,
TestEnum1.B: 9001,
}
good_kwargs = {
"checkpoints_TestEnum1": good_data,
}
checkpoints = from_kwargs(TestEnum1, good_kwargs, strict=True)
assert checkpoints.data == good_data

# Data is from TestEnum2 but we expected TestEnum1
bad_data = {
TestEnum2.A: 1337,
TestEnum2.B: 9001,
}
bad_kwargs = {
"checkpoints_TestEnum1": bad_data,
}
with self.assertRaises(ValueError):
checkpoints = from_kwargs(TestEnum1, bad_kwargs, strict=True)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_log_to_kwargs(self, mock_timestamp):
kwargs = {}

checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A, kwargs=kwargs)
assert "checkpoints_TestEnum1" in kwargs
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert TestEnum1.B not in kwargs["checkpoints_TestEnum1"]

checkpoints.log(TestEnum1.B, kwargs=kwargs)
assert "checkpoints_TestEnum1" in kwargs
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001

pass

@pytest.mark.real_checkpoint_logger
@patch("sentry_sdk.set_measurement")
@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[9001])
def test_create_log_oneliner(self, mock_timestamp, mock_sentry):
kwargs = {
"checkpoints_TestEnum1": {
TestEnum1.A: 1337,
},
}

expected_duration = 9001 - 1337

from_kwargs(TestEnum1, kwargs, strict=True).log(
TestEnum1.B, kwargs=kwargs
).submit_subflow("x", TestEnum1.A, TestEnum1.B)

mock_sentry.assert_called_with("x", expected_duration, "milliseconds")
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[pytest]
addopts = --sqlalchemy-connect-url="postgresql://postgres@postgres:5432/background_test"
markers=
integration: integration tests (includes tests with vcrs)
integration: integration tests (includes tests with vcrs)
real_checkpoint_logger: prevents use of stubbed CheckpointLogger
10 changes: 9 additions & 1 deletion tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from app import celery_app
from database.enums import CommitErrorTypes, Decoration
from database.models import Commit, Pull
from helpers.checkpoint_logger import UploadFlow
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.save_commit_error import save_commit_error
from services.activation import activate_user
Expand All @@ -37,7 +39,6 @@


class NotifyTask(BaseCodecovTask):

name = notify_task_name

throws = (SoftTimeLimitExceeded,)
Expand Down Expand Up @@ -254,6 +255,13 @@ async def run_async_within_lock(
enriched_pull,
empty_upload,
)
checkpoints_from_kwargs(UploadFlow, kwargs).log(
UploadFlow.NOTIFIED
).submit_subflow(
"notification_latency",
UploadFlow.UPLOAD_TASK_BEGIN,
UploadFlow.NOTIFIED,
)
log.info(
"Notifications done",
extra=dict(
Expand Down
Loading
Loading