Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add prometheus instrumentation to checkpointlogger #159

Merged
merged 3 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion helpers/checkpoint_logger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
)

import sentry_sdk
from shared.metrics import metrics
from shared.metrics import Counter, Histogram, metrics

logger = logging.getLogger(__name__)

Expand All @@ -26,6 +26,45 @@
TSubflows: TypeAlias = Mapping[T, Iterable[tuple[str, T]]]


CHECKPOINTS_TOTAL_BEGUN = Counter(
"worker_checkpoints_begun",
"Total number of times a flow's first checkpoint was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_SUCCEEDED = Counter(
"worker_checkpoints_succeeded",
"Total number of times one of a flow's success checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_FAILED = Counter(
"worker_checkpoints_failed",
"Total number of times one of a flow's failure checkpoints was logged.",
["flow"],
)
CHECKPOINTS_TOTAL_ENDED = Counter(
"worker_checkpoints_ended",
"Total number of times one of a flow's terminal checkpoints (success or failure) was logged.",
["flow"],
)
CHECKPOINTS_ERRORS = Counter(
"worker_checkpoints_errors",
"Total number of errors while trying to log checkpoints",
["flow"],
)
CHECKPOINTS_EVENTS = Counter(
"worker_checkpoints_events",
"Total number of checkpoints logged.",
["flow", "checkpoint"],
)

CHECKPOINTS_SUBFLOW_DURATION = Histogram(
"worker_checkpoints_subflow_duration_seconds",
"Duration of subflows in seconds.",
["flow", "subflow"],
buckets=[0.05, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 180, 300, 600, 900],
)


class BaseFlow(str, Enum):
"""
Base class for a flow. Defines optional functions which are added by the
Expand Down Expand Up @@ -266,10 +305,12 @@ class MyEnum(str, Enum):

def log_counters(obj: T) -> None:
metrics.incr(f"{klass.__name__}.events.{obj.name}")
CHECKPOINTS_EVENTS.labels(flow=klass.__name__, checkpoint=obj.name).inc()

# If this is the first checkpoint, increment the number of flows we've begun
if obj == next(iter(klass.__members__.values())):
metrics.incr(f"{klass.__name__}.total.begun")
CHECKPOINTS_TOTAL_BEGUN.labels(flow=klass.__name__).inc()
return

is_failure = hasattr(obj, "is_failure") and obj.is_failure()
Expand All @@ -278,11 +319,14 @@ def log_counters(obj: T) -> None:

if is_failure:
metrics.incr(f"{klass.__name__}.total.failed")
CHECKPOINTS_TOTAL_FAILED.labels(flow=klass.__name__).inc()
elif is_success:
metrics.incr(f"{klass.__name__}.total.succeeded")
CHECKPOINTS_TOTAL_SUCCEEDED.labels(flow=klass.__name__).inc()

if is_terminal:
metrics.incr(f"{klass.__name__}.total.ended")
CHECKPOINTS_TOTAL_ENDED.labels(flow=klass.__name__).inc()

klass.log_counters = log_counters
return klass
Expand Down Expand Up @@ -356,6 +400,7 @@ def _error(self: _Self, msg: str) -> None:
# may have been enqueued by the old worker and be missing checkpoints
# data. At least for that reason, we want to allow failing softly.
metrics.incr("worker.checkpoint_logger.error")
CHECKPOINTS_ERRORS.labels(flow=self.cls.__name__).inc()
if self.strict:
raise ValueError(msg)
else:
Expand Down Expand Up @@ -422,6 +467,10 @@ def submit_subflow(self: _Self, metric: str, start: T, end: T) -> _Self:
duration = self._subflow_duration(start, end)
if duration:
sentry_sdk.set_measurement(metric, duration, "milliseconds")
duration_in_seconds = duration / 1000
CHECKPOINTS_SUBFLOW_DURATION.labels(
flow=self.cls.__name__, subflow=metric
).observe(duration_in_seconds)

return self

Expand Down
159 changes: 154 additions & 5 deletions helpers/tests/unit/test_checkpoint_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
import sentry_sdk
from prometheus_client import REGISTRY
from shared.utils.test_utils import mock_metrics

from helpers.checkpoint_logger import (
Expand All @@ -19,6 +20,42 @@
)


class CounterAssertion:
def __init__(self, metric, labels, expected_value):
self.metric = metric
self.labels = labels
self.expected_value = expected_value

self.before_value = None
self.after_value = None

def __repr__(self):
return f"<CounterAssertion: {self.metric} {self.labels}>"

Check warning on line 33 in helpers/tests/unit/test_checkpoint_logger.py

View check run for this annotation

Codecov - QA / codecov/patch

helpers/tests/unit/test_checkpoint_logger.py#L33

Added line #L33 was not covered by tests

Check warning on line 33 in helpers/tests/unit/test_checkpoint_logger.py

View check run for this annotation

Codecov Public QA / codecov/patch

helpers/tests/unit/test_checkpoint_logger.py#L33

Added line #L33 was not covered by tests

Check warning on line 33 in helpers/tests/unit/test_checkpoint_logger.py

View check run for this annotation

Codecov - Staging / codecov/patch

helpers/tests/unit/test_checkpoint_logger.py#L33

Added line #L33 was not covered by tests

Check warning on line 33 in helpers/tests/unit/test_checkpoint_logger.py

View check run for this annotation

Codecov / codecov/patch

helpers/tests/unit/test_checkpoint_logger.py#L33

Added line #L33 was not covered by tests


class CounterAssertionSet:
def __init__(self, counter_assertions):
self.counter_assertions = counter_assertions

def __enter__(self):
for assertion in self.counter_assertions:
assertion.before_value = (
REGISTRY.get_sample_value(assertion.metric, labels=assertion.labels)
or 0
)

def __exit__(self, exc_type, exc_value, exc_tb):
for assertion in self.counter_assertions:
assertion.after_value = (
REGISTRY.get_sample_value(assertion.metric, labels=assertion.labels)
or 0
)
assert (
assertion.after_value - assertion.before_value
== assertion.expected_value
)


@failure_events("BRANCH_1_FAIL")
@success_events("BRANCH_1_SUCCESS", "BRANCH_2_SUCCESS")
@subflows(
Expand Down Expand Up @@ -161,6 +198,13 @@
expected_duration = 9001 - 1337
checkpoints.submit_subflow("metricname", TestEnum1.A, TestEnum1.B)
mock_sentry.assert_called_with("metricname", expected_duration, "milliseconds")
assert (
REGISTRY.get_sample_value(
"worker_checkpoints_subflow_duration_seconds_sum",
labels={"flow": "TestEnum1", "subflow": "metricname"},
)
== expected_duration / 1000
)

@patch("helpers.checkpoint_logger._get_milli_timestamp", side_effect=[1337])
def test_log_ignore_repeat(self, mock_timestamp):
Expand Down Expand Up @@ -311,39 +355,144 @@

checkpoints = CheckpointLogger(DecoratedEnum)

checkpoints.log(DecoratedEnum.BEGIN)
counter_assertions = [
CounterAssertion(
"worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "BEGIN"},
1,
),
CounterAssertion(
"worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0
),
]
with CounterAssertionSet(counter_assertions):
checkpoints.log(DecoratedEnum.BEGIN)
assert metrics.data["DecoratedEnum.events.BEGIN"] == 1
assert metrics.data["DecoratedEnum.total.begun"] == 1
assert "DecoratedEnum.total.succeeded" not in metrics.data
assert "DecoratedEnum.total.failed" not in metrics.data
assert "DecoratedEnum.total.ended" not in metrics.data

# Nothing special about `CHECKPOINT` - no counters should change
checkpoints.log(DecoratedEnum.CHECKPOINT)
counter_assertions = [
CounterAssertion(
"worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "BEGIN"},
0,
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "CHECKPOINT"},
1,
),
CounterAssertion(
"worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 0
),
]
with CounterAssertionSet(counter_assertions):
checkpoints.log(DecoratedEnum.CHECKPOINT)
assert metrics.data["DecoratedEnum.events.CHECKPOINT"] == 1
assert metrics.data["DecoratedEnum.total.begun"] == 1
assert "DecoratedEnum.total.succeeded" not in metrics.data
assert "DecoratedEnum.total.failed" not in metrics.data
assert "DecoratedEnum.total.ended" not in metrics.data

# Failures should increment both `failed` and `ended`
checkpoints.log(DecoratedEnum.BRANCH_1_FAIL)
counter_assertions = [
CounterAssertion(
"worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_FAIL"},
1,
),
]
with CounterAssertionSet(counter_assertions):
checkpoints.log(DecoratedEnum.BRANCH_1_FAIL)
assert metrics.data["DecoratedEnum.events.BRANCH_1_FAIL"] == 1
assert metrics.data["DecoratedEnum.total.begun"] == 1
assert metrics.data["DecoratedEnum.total.failed"] == 1
assert metrics.data["DecoratedEnum.total.ended"] == 1
assert "DecoratedEnum.total.succeeded" not in metrics.data

# Successes should increment both `succeeded` and `ended`
checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS)
counter_assertions = [
CounterAssertion(
"worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "BRANCH_1_SUCCESS"},
1,
),
]
with CounterAssertionSet(counter_assertions):
checkpoints.log(DecoratedEnum.BRANCH_1_SUCCESS)
assert metrics.data["DecoratedEnum.events.BRANCH_1_SUCCESS"] == 1
assert metrics.data["DecoratedEnum.total.begun"] == 1
assert metrics.data["DecoratedEnum.total.failed"] == 1
assert metrics.data["DecoratedEnum.total.ended"] == 2
assert metrics.data["DecoratedEnum.total.succeeded"] == 1

# A different success path should also increment `succeeded` and `ended`
checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS)
counter_assertions = [
CounterAssertion(
"worker_checkpoints_begun_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_succeeded_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_failed_total", {"flow": "DecoratedEnum"}, 0
),
CounterAssertion(
"worker_checkpoints_ended_total", {"flow": "DecoratedEnum"}, 1
),
CounterAssertion(
"worker_checkpoints_events_total",
{"flow": "DecoratedEnum", "checkpoint": "BRANCH_2_SUCCESS"},
1,
),
]
with CounterAssertionSet(counter_assertions):
checkpoints.log(DecoratedEnum.BRANCH_2_SUCCESS)
assert metrics.data["DecoratedEnum.events.BRANCH_2_SUCCESS"] == 1
assert metrics.data["DecoratedEnum.total.begun"] == 1
assert metrics.data["DecoratedEnum.total.failed"] == 1
Expand Down
Loading