Skip to content

Commit

Permalink
LB-1673: Create endpoints for monitoring service status (#3031)
Browse files Browse the repository at this point in the history
* Add stats monitoring endpoint

* Merge all endpoints into one single, easy endpoint

* Finish implementing the service status endpoint

* Service status is now returned for the current-status page

* Peperating

* more pepping

* Do not call apis, call directly

* Add missing files
  • Loading branch information
mayhem authored Nov 19, 2024
1 parent 63ae751 commit b1f879e
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 4 deletions.
5 changes: 4 additions & 1 deletion listenbrainz/webserver/views/index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import locale
import os
import requests
import time
import os

from brainzutils import cache
from datetime import datetime
Expand All @@ -21,6 +21,7 @@
from listenbrainz.webserver import flash, db_conn, meb_conn, ts_conn
from listenbrainz.webserver.timescale_connection import _ts
from listenbrainz.webserver.redis_connection import _redis
from listenbrainz.webserver.views.status_api import get_service_status
import listenbrainz.db.stats as db_stats
import listenbrainz.db.user_relationship as db_user_relationship
from listenbrainz.db.donation import get_recent_donors
Expand Down Expand Up @@ -92,6 +93,7 @@ def current_status():

load = "%.2f %.2f %.2f" % os.getloadavg()

service_status = get_service_status()
listen_count = _ts.get_total_listen_count()
try:
user_count = format(int(_get_user_count()), ',d')
Expand All @@ -114,6 +116,7 @@ def current_status():

data = {
"load": load,
"service-status": service_status,
"listenCount": format(int(listen_count), ",d") if listen_count else "0",
"userCount": user_count,
"listenCountsPerDay": listen_counts_per_day,
Expand Down
9 changes: 9 additions & 0 deletions listenbrainz/webserver/views/stats_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@ def _get_entity_stats(user_name: str, entity: str, count_key: str):
}})


def get_entity_stats_last_updated(user_name: str, entity: str, count_key: str):
user, stats_range = _validate_stats_user_params(user_name)
stats = db_stats.get(user["id"], entity, stats_range, EntityRecord)
if stats is None:
return None

entity_list, total_entity_count = _process_user_entity(stats, 0, 1)
return stats.last_updated

@stats_api_bp.route("/user/<user_name>/listening-activity")
@crossdomain
@ratelimit()
Expand Down
164 changes: 161 additions & 3 deletions listenbrainz/webserver/views/status_api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
from flask import Blueprint, request, jsonify
from datetime import datetime
from flask import Blueprint, request, jsonify, current_app
import requests
from time import sleep, time

from kombu import Connection, Queue, Exchange
from kombu.exceptions import KombuError
from werkzeug.exceptions import ServiceUnavailable

from listenbrainz.webserver.errors import APIBadRequest, APINotFound
from brainzutils.ratelimit import ratelimit

from brainzutils import cache
from listenbrainz.webserver import db_conn, ts_conn
from listenbrainz.db.playlist import get_recommendation_playlists_for_user
import listenbrainz.db.dump as db_dump
from listenbrainz.webserver.views.stats_api import get_entity_stats_last_updated

STATUS_PREFIX = 'listenbrainz.status' # prefix used in key to cache status
CACHE_TIME = 60 * 60 # time in seconds we cache the fetched data
DUMP_CACHE_TIME = 24 * 60 * 60 # time in seconds we cache the dump check
LISTEN_COUNT_CACHE_TIME = 30 * 60 # time in seconds we cache the listen count
PLAYLIST_CACHE_TIME = 24 * 30 * 60 # time in seconds we cache latest playlist timestamp

status_api_bp = Blueprint("status_api_v1", __name__)

Expand Down Expand Up @@ -34,7 +51,7 @@ def get_dump_info():
dump_id = request.args.get("id")
if dump_id is None:
try:
dump = db_dump.get_dump_entries()[0] # return the latest dump
dump = db_dump.get_dump_entries()[0] # return the latest dump
except IndexError:
raise APINotFound("No dump entry exists.")
else:
Expand Down Expand Up @@ -64,3 +81,144 @@ def _convert_timestamp_to_string_dump_format(timestamp):
String of the format "20190625-170100"
"""
return timestamp.strftime("%Y%m%d-%H%M%S")


def get_stats_timestamp():
""" Check to see when statistics were last generated for a "random" user. Returns unix epoch timestamp"""

cache_key = STATUS_PREFIX + ".stats-timestamp"
last_updated = cache.get(cache_key)
if last_updated is None:
last_updated = get_entity_stats_last_updated("rob", "artists", "total_artist_count")
if last_updated is None:
return None

cache.set(cache_key, last_updated, CACHE_TIME)

return last_updated


def get_playlists_timestamp():
""" Check to see when recommendations playlists were last generated for a "random" user. Returns unix epoch timestamp"""

cache_key = STATUS_PREFIX + ".playlist-timestamp"
last_updated = cache.get(cache_key)
if last_updated is None:
playlists = get_recommendation_playlists_for_user(db_conn, ts_conn, 1)
if playlists is None or not playlists:
return None

last_updated = int(playlists[0].last_updated.timestamp())
cache.set(cache_key, last_updated, PLAYLIST_CACHE_TIME)

return last_updated


def get_incoming_listens_count():
""" Check to see how many listens are currently in the incoming queue. Returns an unix epoch timestamp. """

cache_key = STATUS_PREFIX + ".incoming_listens"
listen_count = cache.get(cache_key)
if listen_count is None:
current_app.logger.warn("no cached data!")
try:
incoming_exchange = Exchange(current_app.config["INCOMING_EXCHANGE"], "fanout", durable=False)
incoming_queue = Queue(current_app.config["INCOMING_QUEUE"], exchange=incoming_exchange, durable=True)

with Connection(hostname=current_app.config["RABBITMQ_HOST"],
userid=current_app.config["RABBITMQ_USERNAME"],
port=current_app.config["RABBITMQ_PORT"],
password=current_app.config["RABBITMQ_PASSWORD"],
virtual_host=current_app.config["RABBITMQ_VHOST"]) as conn:

_, listen_count, _ = incoming_queue.queue_declare(channel=conn.channel(), passive=True)
except KombuError as err:
current_app.logger.error("RabbitMQ is currently not available. Error: %s" % (str(err)))
return None

cache.set(cache_key, listen_count, LISTEN_COUNT_CACHE_TIME)

return listen_count


def get_dump_timestamp():
""" Check when the latst dump was generated. """

cache_key = STATUS_PREFIX + ".dump_timestamp"
dump_timestamp = cache.get(cache_key)
if dump_timestamp is None:
try:
dump = db_dump.get_dump_entries()[0] # return the latest dump
dump_timestamp = int(dump["created"].timestamp())
cache.set(cache_key, dump_timestamp, DUMP_CACHE_TIME)
except IndexError:
return None

return dump_timestamp


def get_service_status():
""" Fetch the age of the last output of various services and return a dict:
{
"dump_age": null,
"incoming_listen_count": 2,
"playlists_age": 63229,
"stats_age": 418605,
"time": 1731429303
}
"""

current_ts = int(time())

dump = get_dump_timestamp()
if dump is None:
dump_age = None
else:
dump_age = current_ts - dump

listen_count = get_incoming_listens_count()

stats = get_stats_timestamp()
if stats is None:
stats_age = None
else:
stats_age = current_ts - stats

playlists = get_playlists_timestamp()
if playlists is None:
playlists_age = None
else:
playlists_age = current_ts - playlists

return {
"time": current_ts,
"dump_age": dump_age,
"stats_age": stats_age,
"playlists_age": playlists_age,
"incoming_listen_count": listen_count
}


@status_api_bp.route("/service-status", methods=["GET"])
@ratelimit()
def service_status():
""" Fetch the recently updated metrics for age of stats, playlists, dumps and the number of items in the incoming
queue. This function returns JSON:
.. code-block:: json
{
"time": 155574537,
"stats": {
"seconds_since_last_update": 1204
},
"incoming_listens": {
"count": 1028
}
}
:statuscode 200: You have data.
:resheader Content-Type: *application/json*
"""

return jsonify(get_service_status())

0 comments on commit b1f879e

Please sign in to comment.