diff --git a/src/exporter.py b/src/exporter.py index b302f0f..476bde5 100644 --- a/src/exporter.py +++ b/src/exporter.py @@ -16,6 +16,7 @@ class Exporter: # pylint: disable=too-many-instance-attributes,too-many-branche def __init__(self, buckets=None): self.registry = CollectorRegistry(auto_describe=True) + self.queue_cache = set() self.state_counters = { "task-sent": Counter( "celery_task_sent", @@ -92,6 +93,39 @@ def __init__(self, buckets=None): registry=self.registry, buckets=buckets or Histogram.DEFAULT_BUCKETS, ) + self.celery_queue_length = Gauge( + "celery_queue_length", + "The number of message in broker queue.", + ["queue_name"], + registry=self.registry, + ) + self.celery_active_consumer_count = Gauge( + "celery_active_consumer_count", + "The number of active consumer in broker queue.", + ["queue_name"], + registry=self.registry, + ) + + def track_queue_length(self): + # request workers to response active queues + # we need to cache queue info in exporter in case all workers are offline + # so that no worker response to exporter will make active_queues return None + queues = self.app.control.inspect().active_queues() or {} + for info_list in queues.values(): + for queue_info in info_list: + self.queue_cache.add(queue_info["name"]) + + with self.app.connection() as connection: + for queue in self.queue_cache: + try: + ret = connection.default_channel.queue_declare(queue=queue, passive=True) + length, consumer_count = ret.message_count, ret.consumer_count + except Exception as ex: # pylint: disable=broad-except + logger.exception(f"Queue {queue} declare failed: {str(ex)}") + length = 0 + consumer_count = 0 + self.celery_queue_length.labels(queue_name=queue).set(length) + self.celery_active_consumer_count.labels(queue_name=queue).set(consumer_count) def track_task_event(self, event): self.state.event(event) @@ -200,7 +234,7 @@ def run(self, click_params): handlers[key] = self.track_task_event with self.app.connection() as connection: - start_http_server(self.registry, connection, click_params["port"]) + start_http_server(self.registry, connection, click_params["port"], self.track_queue_length) while True: try: recv = self.app.events.Receiver(connection, handlers=handlers) diff --git a/src/http_server.py b/src/http_server.py index 78e1a4c..5ed96fb 100644 --- a/src/http_server.py +++ b/src/http_server.py @@ -29,6 +29,7 @@ def index(): @blueprint.route("/metrics") def metrics(): + current_app.config["metrics_puller"]() encoder, content_type = choose_encoder(request.headers.get("accept")) output = encoder(current_app.config["registry"]) return output, 200, {"Content-Type": content_type} @@ -50,10 +51,11 @@ def health(): return f"Connected to the broker {conn.as_uri()}" -def start_http_server(registry, celery_connection, port): +def start_http_server(registry, celery_connection, port, metrics_puller): app = Flask(__name__) app.config["registry"] = registry app.config["celery_connection"] = celery_connection + app.config["metrics_puller"] = metrics_puller app.register_blueprint(blueprint) Thread( target=serve, diff --git a/src/test_cli.py b/src/test_cli.py index 183a4cc..4f98bc9 100644 --- a/src/test_cli.py +++ b/src/test_cli.py @@ -5,12 +5,13 @@ import requests from click.testing import CliRunner from requests.exceptions import HTTPError +from celery.contrib.testing.worker import start_worker from .cli import cli @pytest.mark.celery() -def test_integration(celery_app, celery_worker): +def test_integration(celery_app): def run(): CliRunner().invoke( cli, @@ -34,15 +35,29 @@ def succeed(): def fail(): raise HTTPError("Big, big error") - celery_worker.reload() + # start worker first so the exporter can fetch and cache queue information + with start_worker(celery_app, without_heartbeat=False) as celery_worker: + res = requests.get("http://localhost:23000/metrics") + assert res.status_code == 200 + assert 'celery_queue_length{queue_name="celery"} 0.0' in res.text + assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text + succeed.apply_async() succeed.apply_async() fail.apply_async() - time.sleep(2) + # assert celery_queue_length when message in broker but no worker start res = requests.get("http://localhost:23000/metrics") assert res.status_code == 200 + assert 'celery_queue_length{queue_name="celery"} 3.0' in res.text + assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text + + # start worker and consume message in broker + with start_worker(celery_app, without_heartbeat=False) as celery_worker: + time.sleep(2) + res = requests.get("http://localhost:23000/metrics") + assert res.status_code == 200 # pylint: disable=line-too-long assert ( f'celery_task_sent_total{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0' @@ -80,3 +95,5 @@ def fail(): f'celery_task_runtime_count{{hostname="{celery_worker.hostname}",name="src.test_cli.succeed"}} 2.0' in res.text ) + assert 'celery_queue_length{queue_name="celery"} 0.0' in res.text + assert 'celery_active_consumer_count{queue_name="celery"} 0.0' in res.text