Skip to content

Commit

Permalink
Emit metric of how long a task was enqueued (#1183)
Browse files Browse the repository at this point in the history
* Emit metric of how long a task was enqueued

This is being done mostly because of `label-analysis` task, where we want better understanding
of how much time the tasks stay in the queue, and what's the impact of that in overall ATS.

But this is a useful metric anyway to fine tune other parameters in celery, and to understand what is
the gain of having dedicated queues and all that in hard numbers. So we're doing it for all tasks.

The idea is simple: we add a header with the timestamp where the task was created (i.e. added to the queue)
and on the other side, when it's executed, we emit a metric with the timedelta.

[Coverage-Team #57](Extract time in queue metric for all worker's tasks)

* Use isoformat instead of custom format for the header
  • Loading branch information
giovanni-guidini committed Jun 13, 2023
1 parent df09438 commit e61c91a
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 3 deletions.
1 change: 1 addition & 0 deletions requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pytest-cov
pytest-celery
pytest-mock
pytest-sqlalchemy
pytest-freezegun
pytest
python-dateutil
python-json-logger
Expand Down
6 changes: 6 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ factory-boy==3.2.0
# via -r requirements.in
faker==8.8.2
# via factory-boy
freezegun==1.2.2
# via pytest-freezegun
google-api-core==1.26.1
# via google-cloud-core
google-auth==1.27.1
Expand Down Expand Up @@ -192,6 +194,7 @@ pytest==7.2.0
# -r requirements.in
# pytest-asyncio
# pytest-cov
# pytest-freezegun
# pytest-mock
# pytest-sqlalchemy
pytest-asyncio==0.14.0
Expand All @@ -200,6 +203,8 @@ pytest-celery==0.0.0
# via -r requirements.in
pytest-cov==2.11.1
# via -r requirements.in
pytest-freezegun==0.4.2
# via -r requirements.in
pytest-mock==1.13.0
# via -r requirements.in
pytest-sqlalchemy==0.2.1
Expand All @@ -210,6 +215,7 @@ python-dateutil==2.8.1
# analytics-python
# botocore
# faker
# freezegun
python-json-logger==0.1.11
# via -r requirements.in
pytz==2022.1
Expand Down
25 changes: 24 additions & 1 deletion tasks/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import signal
from datetime import datetime

from celery.exceptions import SoftTimeLimitExceeded
from celery.worker.request import Request
Expand Down Expand Up @@ -58,7 +59,14 @@ def apply_async(self, args=None, kwargs=None, **options):
"soft_time_limit": extra_config.get("soft_timelimit", None),
}
options = {**options, **celery_compatible_config}
return super().apply_async(args=args, kwargs=kwargs, **options)
# Pass current time in task headers so we can emit a metric of
# how long the task was in the queue for
current_time = datetime.now()
headers = {
**options.get("headers", {}),
"created_timestamp": current_time.isoformat(),
}
return super().apply_async(args=args, kwargs=kwargs, headers=headers, **options)

def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs):
try:
Expand Down Expand Up @@ -86,11 +94,26 @@ def _analyse_error(self, exception: SQLAlchemyError, *args, **kwargs):
exc_info=True,
)

def _emit_queue_metrics(self):
created_timestamp = self.request.get("created_timestamp", None)
if created_timestamp:
enqueued_time = datetime.fromisoformat(created_timestamp)
now = datetime.now()
delta = now - enqueued_time
metrics.timing(f"{self.metrics_prefix}.time_in_queue", delta)
queue_name = self.request.get("delivery_info", {}).get("routing_key", None)
if queue_name:
metrics.timing(f"worker.queues.{queue_name}.time_in_queue", delta)
metrics.timing(
f"{self.metrics_prefix}.{queue_name}.time_in_queue", delta
)

def run(self, *args, **kwargs):
# Setup signal handlers if something fails catastrophically
signal.signal(signal.SIGTERM, self._signal_handler)
signal.signal(signal.SIGABRT, self._signal_handler)
signal.signal(signal.SIGSEGV, self._signal_handler)
self._emit_queue_metrics()
with metrics.timer(f"{self.metrics_prefix}.full"):
db_session = get_db_session()
try:
Expand Down
51 changes: 49 additions & 2 deletions tasks/tests/unit/test_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import signal
from datetime import timedelta
from pathlib import Path
from unittest.mock import patch

Expand All @@ -7,7 +8,7 @@
from celery import chain
from celery.contrib.testing.mocks import TaskMessage
from celery.exceptions import Retry, SoftTimeLimitExceeded
from mock import MagicMock
from mock import call
from shared.billing import BillingPlan
from shared.celery_config import sync_repos_task_name, upload_task_name
from sqlalchemy.exc import DBAPIError, IntegrityError, InvalidRequestError
Expand Down Expand Up @@ -82,12 +83,41 @@ def test_hard_time_limit_task_from_default_app(self, mocker):
r = SampleTask()
assert r.hard_time_limit_task == 480

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_sample_run(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_metrics = mocker.patch("tasks.base.metrics")
mock_task_request = mocker.patch("tasks.base.BaseCodecovTask.request")
fake_request_values = dict(
created_timestamp="2023-06-13 10:00:00.000000",
delivery_info={"routing_key": "my-queue"},
)
mock_task_request.get.side_effect = (
lambda key, default: fake_request_values.get(key, default)
)
mocked_get_db_session.return_value = dbsession
result = SampleTask().run()
task_instance = SampleTask()
result = task_instance.run()
assert result == {"unusual": "return", "value": ["There"]}
assert mocked_metrics.timing.call_count == 3
mocked_metrics.timing.assert_has_calls(
[
call(
"worker.task.test.SampleTask.time_in_queue",
timedelta(seconds=61, microseconds=123),
),
call(
"worker.queues.my-queue.time_in_queue",
timedelta(seconds=61, microseconds=123),
),
call(
"worker.task.test.SampleTask.my-queue.time_in_queue",
timedelta(seconds=61, microseconds=123),
),
]
)

@patch("tasks.base.BaseCodecovTask._emit_queue_metrics")
def test_sample_run_db_exception(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_get_db_session.return_value = dbsession
Expand All @@ -96,6 +126,7 @@ def test_sample_run_db_exception(self, mocker, dbsession):
DBAPIError("statement", "params", "orig")
).run()

@patch("tasks.base.BaseCodecovTask._emit_queue_metrics")
def test_sample_run_integrity_error(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_get_db_session.return_value = dbsession
Expand All @@ -104,6 +135,7 @@ def test_sample_run_integrity_error(self, mocker, dbsession):
IntegrityError("statement", "params", "orig")
).run()

@patch("tasks.base.BaseCodecovTask._emit_queue_metrics")
def test_sample_run_deadlock_exception(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_get_db_session.return_value = dbsession
Expand All @@ -112,12 +144,14 @@ def test_sample_run_deadlock_exception(self, mocker, dbsession):
psycopg2.errors.DeadlockDetected()
).run()

@patch("tasks.base.BaseCodecovTask._emit_queue_metrics")
def test_sample_run_operationalerror_exception(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_get_db_session.return_value = dbsession
with pytest.raises(Retry):
SampleTaskWithArbitraryPostgresError(psycopg2.OperationalError()).run()

@patch("tasks.base.BaseCodecovTask._emit_queue_metrics")
def test_sample_run_softimeout(self, mocker, dbsession):
mocked_get_db_session = mocker.patch("tasks.base.get_db_session")
mocked_get_db_session.return_value = dbsession
Expand Down Expand Up @@ -305,6 +339,7 @@ def fake_repos(self, dbsession, fake_owners):
dbsession.flush()
return (repo, repo_enterprise_cloud)

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_apply_async_override(self, mocker):

mock_get_db_session = mocker.patch("tasks.base.get_db_session")
Expand All @@ -329,10 +364,12 @@ def test_apply_async_override(self, mocker):
mocked_apply_async.assert_called_with(
args=None,
kwargs=kwargs,
headers=dict(created_timestamp="2023-06-13T10:01:01.000123"),
time_limit=400,
soft_time_limit=200,
)

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_apply_async_override_with_chain(self, mocker):

mock_get_db_session = mocker.patch("tasks.base.get_db_session")
Expand Down Expand Up @@ -362,7 +399,12 @@ def test_apply_async_override_with_chain(self, mocker):
assert "kwargs" in kwargs and kwargs.get("kwargs") == {"n": 1}
assert "chain" in kwargs and len(kwargs.get("chain")) == 1
assert "task_id" in kwargs
assert "headers" in kwargs
assert kwargs.get("headers") == dict(
created_timestamp="2023-06-13T10:01:01.000123"
)

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_real_example_no_override(
self, mocker, dbsession, mock_configuration, fake_repos
):
Expand Down Expand Up @@ -402,9 +444,11 @@ def test_real_example_no_override(
args=None,
kwargs=kwargs,
soft_time_limit=None,
headers=dict(created_timestamp="2023-06-13T10:01:01.000123"),
time_limit=None,
)

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_real_example_override_from_celery(
self, mocker, dbsession, mock_configuration, fake_repos
):
Expand Down Expand Up @@ -444,9 +488,11 @@ def test_real_example_override_from_celery(
args=None,
kwargs=kwargs,
soft_time_limit=500,
headers=dict(created_timestamp="2023-06-13T10:01:01.000123"),
time_limit=600,
)

@pytest.mark.freeze_time("2023-06-13T10:01:01.000123")
def test_real_example_override_from_upload(
self, mocker, dbsession, mock_configuration, fake_repos
):
Expand Down Expand Up @@ -486,5 +532,6 @@ def test_real_example_override_from_upload(
args=None,
kwargs=kwargs,
soft_time_limit=400,
headers=dict(created_timestamp="2023-06-13T10:01:01.000123"),
time_limit=450,
)

0 comments on commit e61c91a

Please sign in to comment.