Skip to content
36 changes: 35 additions & 1 deletion src/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Exporter: # pylint: disable=too-many-instance-attributes,too-many-branche

def __init__(self, buckets=None):
self.registry = CollectorRegistry(auto_describe=True)
self.queue_cache = set()
self.state_counters = {
"task-sent": Counter(
"celery_task_sent",
Expand Down Expand Up @@ -92,6 +93,39 @@ def __init__(self, buckets=None):
registry=self.registry,
buckets=buckets or Histogram.DEFAULT_BUCKETS,
)
self.celery_queue_length = Gauge(
"celery_queue_length",
"The number of message in broker queue.",
["queue_name"],
registry=self.registry,
)
self.celery_active_consumer_count = Gauge(
"celery_active_consumer_count",
"The number of active consumer in broker queue.",
["queue_name"],
registry=self.registry,
)

def track_queue_length(self):
# request workers to response active queues
# we need to cache queue info in exporter in case all workers are offline
# so that no worker response exporter make active_queues return None
queues = self.app.control.inspect().active_queues() or {}
for info_list in queues.values():
for queue_info in info_list:
self.queue_cache.add(queue_info["name"])

with self.app.connection() as connection:
for queue in self.queue_cache:
try:
ret = connection.default_channel.queue_declare(queue=queue, passive=True)
length, consumer_count = ret.message_count, ret.consumer_count
except Exception as ex: # pylint: disable=broad-except
logger.exception(f"Queue {queue} declare failed: {str(ex)}")
length = 0
consumer_count = 0
self.celery_queue_length.labels(queue_name=queue).set(length)
self.celery_active_consumer_count.labels(queue_name=queue).set(consumer_count)

def track_task_event(self, event):
self.state.event(event)
Expand Down Expand Up @@ -200,7 +234,7 @@ def run(self, click_params):
handlers[key] = self.track_task_event

with self.app.connection() as connection:
start_http_server(self.registry, connection, click_params["port"])
start_http_server(self.registry, connection, click_params["port"], self.track_queue_length)
while True:
try:
recv = self.app.events.Receiver(connection, handlers=handlers)
Expand Down
4 changes: 3 additions & 1 deletion src/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def index():

@blueprint.route("/metrics")
def metrics():
current_app.config["metrics_puller"]()
encoder, content_type = choose_encoder(request.headers.get("accept"))
output = encoder(current_app.config["registry"])
return output, 200, {"Content-Type": content_type}
Expand All @@ -50,10 +51,11 @@ def health():
return f"Connected to the broker {conn.as_uri()}"


def start_http_server(registry, celery_connection, port):
def start_http_server(registry, celery_connection, port, metrics_puller):
app = Flask(__name__)
app.config["registry"] = registry
app.config["celery_connection"] = celery_connection
app.config["metrics_puller"] = metrics_puller
app.register_blueprint(blueprint)
Thread(
target=serve,
Expand Down
88 changes: 48 additions & 40 deletions src/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import requests
from click.testing import CliRunner
from requests.exceptions import HTTPError
from celery.contrib.testing.worker import start_worker

from .cli import cli


@pytest.mark.celery()
def test_integration(celery_app, celery_worker):
def test_integration(celery_app):
def run():
CliRunner().invoke(
cli,
Expand All @@ -34,49 +35,56 @@ def succeed():
def fail():
raise HTTPError("Big, big error")

celery_worker.reload()
# start worker first so the exporter can fetch and cache queue information
with start_worker(celery_app, without_heartbeat=False) as celery_worker:
res = requests.get("http://localhost:23000/metrics")
assert res.status_code == 200
assert 'celery_queue_length{queue_name="celery"} 0.0' in res.text
assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text

succeed.apply_async()
succeed.apply_async()
fail.apply_async()

time.sleep(2)
# assert celery_queue_length when message in broker but no worker start
res = requests.get("http://localhost:23000/metrics")
assert res.status_code == 200
assert 'celery_queue_length{queue_name="celery"} 3.0' in res.text
assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text

# pylint: disable=line-too-long
assert (
f'celery_task_sent_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_sent_total{{hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_received_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_received_total{{hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_succeeded_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_failed_total{{exception="HTTPError",hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_runtime_count{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
# start worker and consume message in broker
with start_worker(celery_app, without_heartbeat=False) as celery_worker:
time.sleep(2)
res = requests.get("http://localhost:23000/metrics")
assert res.status_code == 200
# pylint: disable=line-too-long
assert (
f'celery_task_received_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_received_total{{hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_started_total{{hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_succeeded_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert (
f'celery_task_failed_total{{exception="HTTPError",hostname="{celery_worker.hostname}",name="src.test_cli.fail"}} 1.0'
in res.text
)
assert (
f'celery_task_runtime_count{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0'
in res.text
)
assert 'celery_queue_length{queue_name="celery"} 0.0' in res.text
assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text