Skip to content

Commit

Permalink
Merge pull request #47 from codecov/matt/add-upload-flow-logs
Browse files Browse the repository at this point in the history
feat: add "flow logger" + instrument upload flow with it
  • Loading branch information
matt-codecov committed Aug 16, 2023
2 parents 76c780f + aacbd27 commit 5df32f9
Show file tree
Hide file tree
Showing 10 changed files with 608 additions and 53 deletions.
24 changes: 24 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,27 @@ def with_sql_functions(dbsession):
$$ language sql stable strict;"""
)
dbsession.flush()


# We don't want any tests submitting checkpoint logs to Sentry for real
@pytest.fixture(autouse=True)
def mock_checkpoint_submit(mocker, request):
# We mock sentry differently in the tests for CheckpointLogger
if request.node.get_closest_marker("real_checkpoint_logger"):
return

def mock_submit_fn(metric, start, end):
pass

mock_submit = mocker.Mock()
mock_submit.side_effect = mock_submit_fn

import helpers

return mocker.patch.object(
helpers.checkpoint_logger.CheckpointLogger,
"submit_subflow",
mock_submit,
)

return mock_submit
98 changes: 98 additions & 0 deletions helpers/checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import logging
import time
from enum import Enum, auto

import sentry_sdk
from shared.metrics import metrics

logger = logging.getLogger(__name__)


class UploadFlow(str, Enum):
UPLOAD_TASK_BEGIN = auto()
PROCESSING_BEGIN = auto()
INITIAL_PROCESSING_COMPLETE = auto()
BATCH_PROCESSING_COMPLETE = auto()
PROCESSING_COMPLETE = auto()
NOTIFIED = auto()


def _get_milli_timestamp():
return time.time_ns() // 1000000


def _kwargs_key(cls):
return f"checkpoints_{cls.__name__}"


def from_kwargs(cls, kwargs, strict=False):
data = kwargs.get(_kwargs_key(cls), {})

# Make sure these checkpoints were made with the same flow
for key in data.keys():
if key not in iter(cls):
raise ValueError(f"Checkpoint {key} not part of flow `{cls.__name__}`")

return CheckpointLogger(cls, data, strict)


class CheckpointLogger:
def __init__(self, cls, data=None, strict=False):
self.cls = cls
self.data = data if data else {}
self.kwargs_key = _kwargs_key(self.cls)
self.strict = strict

def _error(self, msg):
# When a new version of worker rolls out, it will pick up tasks that
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
if self.strict:
raise ValueError(msg)
else:
logger.error(msg)

def _validate_checkpoint(self, checkpoint):
if checkpoint.__class__ != self.cls:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Checkpoint {checkpoint} not part of flow `{self.cls.__name__}`"
)

def _subflow_duration(self, start, end):
self._validate_checkpoint(start)
self._validate_checkpoint(end)
if start not in self.data:
return self._error(
f"Cannot compute duration; missing start checkpoint {start}"
)
elif end not in self.data:
return self._error(f"Cannot compute duration; missing end checkpoint {end}")
elif end.value <= start.value:
# This error is not ignored when `self.strict==False` because it's definitely
# a code mistake
raise ValueError(
f"Cannot compute duration; end {end} is not after start {start}"
)

return self.data[end] - self.data[start]

def log(self, checkpoint, ignore_repeat=False, kwargs=None):
if checkpoint not in self.data:
self._validate_checkpoint(checkpoint)
self.data[checkpoint] = _get_milli_timestamp()
elif not ignore_repeat:
self._error(f"Already recorded checkpoint {checkpoint}")

if kwargs is not None:
kwargs[self.kwargs_key] = self.data

return self

def submit_subflow(self, metric, start, end):
duration = self._subflow_duration(start, end)
sentry_sdk.set_measurement(metric, duration, "milliseconds")

return self
196 changes: 196 additions & 0 deletions helpers/tests/unit/test_checkpoint_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import unittest
from enum import Enum, auto
from unittest.mock import ANY, patch

import pytest
import sentry_sdk

from helpers.checkpoint_logger import (
CheckpointLogger,
_get_milli_timestamp,
from_kwargs,
)


class TestEnum1(Enum):
A = auto()
B = auto()
C = auto()


class TestEnum2(Enum):
A = auto()
B = auto()
C = auto()


class TestCheckpointLogger(unittest.TestCase):
@patch("time.time_ns", return_value=123456789)
def test_get_milli_timestamp(self, mocker):
expected_ms = 123456789 // 1000000
self.assertEqual(_get_milli_timestamp(), expected_ms)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_log_checkpoint(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)

@patch(
"helpers.checkpoint_logger._get_milli_timestamp",
side_effect=[1337, 9001, 100000],
)
def test_log_multiple_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)
checkpoints.log(TestEnum1.C)

self.assertEqual(checkpoints.data[TestEnum1.A], 1337)
self.assertEqual(checkpoints.data[TestEnum1.B], 9001)
self.assertEqual(checkpoints.data[TestEnum1.C], 100000)

def test_log_checkpoint_twice_ahrows(self):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum1.A)

def test_log_checkpoint_wrong_enum_throws(self):
checkpoints = CheckpointLogger(TestEnum1, strict=True)

with self.assertRaises(ValueError):
checkpoints.log(TestEnum2.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration(self, mocker):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

duration = checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)
self.assertEqual(duration, 9001 - 1337)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_missing_checkpoints(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.C)

# Missing end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.B)

# Missing start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.C)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_subflow_duration_wrong_order(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

# End < start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.B, TestEnum1.A)

# End == start
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum1.A)

@patch("helpers.checkpoint_logger._get_milli_timestamp", return_value=1337)
def test_subflow_duration_wrong_enum(self, mocker):
checkpoints = CheckpointLogger(TestEnum1, strict=True)
checkpoints.log(TestEnum1.A)

# Wrong enum for start checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum2.A, TestEnum1.A)

# Wrong enum for end checkpoint
with self.assertRaises(ValueError):
checkpoints._subflow_duration(TestEnum1.A, TestEnum2.A)

@pytest.mark.real_checkpoint_logger
@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
@patch("sentry_sdk.set_measurement")
def test_submit_subflow(self, mock_sentry, mock_timestamp):
checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A)
checkpoints.log(TestEnum1.B)

expected_duration = 9001 - 1337
checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B)
mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds")

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337])
def test_log_ignore_repeat(self, mock_timestamp):
checkpoints = CheckpointLogger(TestEnum1, strict=True)

checkpoints.log(TestEnum1.A)
time = checkpoints.data[TestEnum1.A]

checkpoints.log(TestEnum1.A, ignore_repeat=True)
assert checkpoints.data[TestEnum1.A] == time

def test_create_from_kwargs(self):
good_data = {
TestEnum1.A: 1337,
TestEnum1.B: 9001,
}
good_kwargs = {
"checkpoints_TestEnum1": good_data,
}
checkpoints = from_kwargs(TestEnum1, good_kwargs, strict=True)
assert checkpoints.data == good_data

# Data is from TestEnum2 but we expected TestEnum1
bad_data = {
TestEnum2.A: 1337,
TestEnum2.B: 9001,
}
bad_kwargs = {
"checkpoints_TestEnum1": bad_data,
}
with self.assertRaises(ValueError):
checkpoints = from_kwargs(TestEnum1, bad_kwargs, strict=True)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337, 9001])
def test_log_to_kwargs(self, mock_timestamp):
kwargs = {}

checkpoints = CheckpointLogger(TestEnum1)
checkpoints.log(TestEnum1.A, kwargs=kwargs)
assert "checkpoints_TestEnum1" in kwargs
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert TestEnum1.B not in kwargs["checkpoints_TestEnum1"]

checkpoints.log(TestEnum1.B, kwargs=kwargs)
assert "checkpoints_TestEnum1" in kwargs
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001

pass

@pytest.mark.real_checkpoint_logger
@patch("sentry_sdk.set_measurement")
@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[9001])
def test_create_log_oneliner(self, mock_timestamp, mock_sentry):
kwargs = {
"checkpoints_TestEnum1": {
TestEnum1.A: 1337,
},
}

expected_duration = 9001 - 1337

from_kwargs(TestEnum1, kwargs, strict=True).log(
TestEnum1.B, kwargs=kwargs
).submit_subflow("x", TestEnum1.A, TestEnum1.B)

mock_sentry.assert_called_with("x", expected_duration, "milliseconds")
assert kwargs["checkpoints_TestEnum1"][TestEnum1.A] == 1337
assert kwargs["checkpoints_TestEnum1"][TestEnum1.B] == 9001
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
[pytest]
addopts = --sqlalchemy-connect-url="postgresql://postgres@postgres:5432/background_test"
markers=
integration: integration tests (includes tests with vcrs)
integration: integration tests (includes tests with vcrs)
real_checkpoint_logger: prevents use of stubbed CheckpointLogger
10 changes: 9 additions & 1 deletion tasks/notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from app import celery_app
from database.enums import CommitErrorTypes, Decoration
from database.models import Commit, Pull
from helpers.checkpoint_logger import UploadFlow
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.save_commit_error import save_commit_error
from services.activation import activate_user
Expand All @@ -37,7 +39,6 @@


class NotifyTask(BaseCodecovTask):

name = notify_task_name

throws = (SoftTimeLimitExceeded,)
Expand Down Expand Up @@ -254,6 +255,13 @@ async def run_async_within_lock(
enriched_pull,
empty_upload,
)
checkpoints_from_kwargs(UploadFlow, kwargs).log(
UploadFlow.NOTIFIED
).submit_subflow(
"notification_latency",
UploadFlow.UPLOAD_TASK_BEGIN,
UploadFlow.NOTIFIED,
)
log.info(
"Notifications done",
extra=dict(
Expand Down
Loading

0 comments on commit 5df32f9

Please sign in to comment.