Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions authentik/admin/api/workers.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
"""authentik administration overview"""

from socket import gethostname

from django.conf import settings
from drf_spectacular.utils import extend_schema, inline_serializer
from rest_framework.fields import IntegerField
from packaging.version import parse
from rest_framework.fields import BooleanField, CharField
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.views import APIView

from authentik import get_full_version
from authentik.rbac.permissions import HasPermission
from authentik.root.celery import CELERY_APP

Expand All @@ -16,11 +20,38 @@ class WorkerView(APIView):

permission_classes = [HasPermission("authentik_rbac.view_system_info")]

@extend_schema(responses=inline_serializer("Workers", fields={"count": IntegerField()}))
@extend_schema(
responses=inline_serializer(
"Worker",
fields={
"worker_id": CharField(),
"version": CharField(),
"version_matching": BooleanField(),
},
many=True,
)
)
def get(self, request: Request) -> Response:
"""Get currently connected worker count."""
count = len(CELERY_APP.control.ping(timeout=0.5))
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
our_version = parse(get_full_version())
response = []
for worker in raw:
key = list(worker.keys())[0]
version = worker[key].get("version")
version_matching = False
if version:
version_matching = parse(version) == our_version
response.append(
{"worker_id": key, "version": version, "version_matching": version_matching}
)
# In debug we run with `task_always_eager`, so tasks are ran on the main process
if settings.DEBUG: # pragma: no cover
count += 1
return Response({"count": count})
response.append(
{
"worker_id": f"authentik-debug@{gethostname()}",
"version": get_full_version(),
"version_matching": True,
}
)
return Response(response)
3 changes: 1 addition & 2 deletions authentik/admin/apps.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""authentik admin app config"""

from prometheus_client import Gauge, Info
from prometheus_client import Info

from authentik.blueprints.apps import ManagedAppConfig

PROM_INFO = Info("authentik_version", "Currently running authentik version")
GAUGE_WORKERS = Gauge("authentik_admin_workers", "Currently connected workers")


class AuthentikAdminConfig(ManagedAppConfig):
Expand Down
27 changes: 24 additions & 3 deletions authentik/admin/signals.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
"""admin signals"""

from django.dispatch import receiver
from packaging.version import parse
from prometheus_client import Gauge

from authentik.admin.apps import GAUGE_WORKERS
from authentik import get_full_version
from authentik.root.celery import CELERY_APP
from authentik.root.monitoring import monitoring_set

GAUGE_WORKERS = Gauge(
"authentik_admin_workers",
"Currently connected workers, their versions and if they are the same version as authentik",
["version", "version_matched"],
)


_version = parse(get_full_version())


@receiver(monitoring_set)
def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
count = len(CELERY_APP.control.ping(timeout=0.5))
GAUGE_WORKERS.set(count)
raw: list[dict[str, dict]] = CELERY_APP.control.ping(timeout=0.5)
worker_version_count = {}
for worker in raw:
key = list(worker.keys())[0]
version = worker[key].get("version")
version_matching = False
if version:
version_matching = parse(version) == _version
worker_version_count.setdefault(version, {"count": 0, "matching": version_matching})
worker_version_count[version]["count"] += 1
for version, stats in worker_version_count.items():
GAUGE_WORKERS.labels(version, stats["matching"]).set(stats["count"])
2 changes: 1 addition & 1 deletion authentik/admin/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_workers(self):
response = self.client.get(reverse("authentik_api:admin_workers"))
self.assertEqual(response.status_code, 200)
body = loads(response.content)
self.assertEqual(body["count"], 0)
self.assertEqual(len(body), 0)

def test_metrics(self):
"""Test metrics API"""
Expand Down
8 changes: 8 additions & 0 deletions authentik/root/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
task_prerun,
worker_ready,
)
from celery.worker.control import inspect_command
from django.conf import settings
from django.db import ProgrammingError
from django_tenants.utils import get_public_schema_name
from structlog.contextvars import STRUCTLOG_KEY_PREFIX
from structlog.stdlib import get_logger
from tenant_schemas_celery.app import CeleryApp as TenantAwareCeleryApp

from authentik import get_full_version
from authentik.lib.sentry import before_send
from authentik.lib.utils.errors import exception_to_string

Expand Down Expand Up @@ -159,6 +161,12 @@ def update_heartbeat_file(self, worker: Worker):
HEARTBEAT_FILE.touch()


@inspect_command(default_timeout=0.2)
def ping(state, **kwargs):
"""Ping worker(s)."""
return {"ok": "pong", "version": get_full_version()}


CELERY_APP.config_from_object(settings.CELERY)

# Load task modules from all registered Django app configs.
Expand Down
2 changes: 1 addition & 1 deletion blueprints/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -4159,7 +4159,7 @@
"re_evaluate_policies": {
"type": "boolean",
"title": "Re evaluate policies",
"description": "Evaluate policies when the Stage is present to the user."
"description": "Evaluate policies when the Stage is presented to the user."
},
"order": {
"type": "integer",
Expand Down
20 changes: 14 additions & 6 deletions schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ paths:
description: ''
/admin/workers/:
get:
operationId: admin_workers_retrieve
operationId: admin_workers_list
description: Get currently connected worker count.
tags:
- admin
Expand All @@ -360,7 +360,9 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/Workers'
type: array
items:
$ref: '#/components/schemas/Worker'
description: ''
'400':
content:
Expand Down Expand Up @@ -56987,13 +56989,19 @@ components:
required:
- aaguid
- description
Workers:
Worker:
type: object
properties:
count:
type: integer
worker_id:
type: string
version:
type: string
version_matching:
type: boolean
required:
- count
- version
- version_matching
- worker_id
modelRequest:
oneOf:
- $ref: '#/components/schemas/GoogleWorkspaceProviderRequest'
Expand Down
23 changes: 15 additions & 8 deletions web/src/admin/admin-overview/cards/WorkerStatusCard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,41 @@ import { msg } from "@lit/localize";
import { TemplateResult, html } from "lit";
import { customElement } from "lit/decorators.js";

import { AdminApi } from "@goauthentik/api";
import { AdminApi, Worker } from "@goauthentik/api";

@customElement("ak-admin-status-card-workers")
export class WorkersStatusCard extends AdminStatusCard<number> {
export class WorkersStatusCard extends AdminStatusCard<Worker[]> {
icon = "pf-icon pf-icon-server";

getPrimaryValue(): Promise<number> {
return new AdminApi(DEFAULT_CONFIG).adminWorkersRetrieve().then((workers) => {
return workers.count;
});
getPrimaryValue(): Promise<Worker[]> {
return new AdminApi(DEFAULT_CONFIG).adminWorkersList();
}

renderHeader(): TemplateResult {
return html`${msg("Workers")}`;
}

getStatus(value: number): Promise<AdminStatus> {
if (value < 1) {
getStatus(value: Worker[]): Promise<AdminStatus> {
if (value.length < 1) {
return Promise.resolve<AdminStatus>({
icon: "fa fa-times-circle pf-m-danger",
message: html`${msg("No workers connected. Background tasks will not run.")}`,
});
} else if (value.filter((w) => !w.versionMatching).length > 0) {
return Promise.resolve<AdminStatus>({
icon: "fa fa-times-circle pf-m-danger",
message: html`${msg("Worker with incorrect version connected.")}`,
});
} else {
return Promise.resolve<AdminStatus>({
icon: "fa fa-check-circle pf-m-success",
});
}
}

renderValue() {
return html`${this.value?.length}`;
}
}

declare global {
Expand Down