Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""A command to update OWASP projects data."""

from django.core.management import call_command
from django.core.management.base import BaseCommand

from apps.owasp.models.project import Project
Expand Down Expand Up @@ -114,3 +115,10 @@ def handle(self, *_args, **options) -> None:

# Bulk save data.
Project.bulk_save(projects)

if offset == 0:
self.stdout.write(self.style.NOTICE("Updating project level compliance..."))
call_command("owasp_update_project_level_compliance")

self.stdout.write(self.style.NOTICE("Updating project health scores..."))
call_command("owasp_update_project_health_scores")
Original file line number Diff line number Diff line change
@@ -1,15 +1,39 @@
"""A command to update OWASP project health metrics scores."""
"""Update OWASP project health scores.

This command calculates health scores for OWASP projects
based on defined metrics and requirement thresholds.
Projects marked as level non-compliant receive an
additional score penalty to reflect misalignment with
official OWASP project classification.
"""

from django.core.management.base import BaseCommand

from apps.owasp.models.project_health_metrics import ProjectHealthMetrics
from apps.owasp.models.project_health_requirements import ProjectHealthRequirements

LEVEL_NON_COMPLIANCE_PENALTY = 10.0


class Command(BaseCommand):
"""Compute and update project health scores.

Health scores are derived from project metrics and
requirement definitions. Projects that fail level
compliance checks are penalized to ensure score
accuracy reflects governance alignment.
"""

help = "Update OWASP project health scores."

def handle(self, *args, **options):
"""Calculate and persist project health scores.

This method iterates over project health metrics,
evaluates each metric against defined requirements,
applies penalties for non-compliant project levels,
and stores the final computed score.
"""
forward_fields = {
"age_days": 6.0,
"contributors_count": 6.0,
Expand All @@ -22,6 +46,7 @@ def handle(self, *args, **options):
"total_pull_requests_count": 6.0,
"total_releases_count": 6.0,
}

backward_fields = {
"last_commit_days": 6.0,
"last_pull_request_days": 6.0,
Expand All @@ -32,23 +57,30 @@ def handle(self, *args, **options):
"unassigned_issues_count": 6.0,
}

project_health_metrics = []
project_health_requirements = {
phr.level: phr for phr in ProjectHealthRequirements.objects.all()
}
for metric in ProjectHealthMetrics.objects.filter(
score__isnull=True,
).select_related(
"project",
requirements_by_level = {req.level: req for req in ProjectHealthRequirements.objects.all()}

metrics_to_update = []

for metric in ProjectHealthMetrics.objects.filter(score__isnull=True).select_related(
"project"
):
# Calculate the score based on requirements.
requirements = requirements_by_level.get(metric.project.level)

if not requirements:
self.stdout.write(
self.style.WARNING(
f"Skipping {metric.project.name}: "
f"No requirements found for level {metric.project.level}"
)
)
continue

self.stdout.write(
self.style.NOTICE(f"Updating score for project: {metric.project.name}")
)

requirements = project_health_requirements[metric.project.level]

score = 0.0

for field, weight in forward_fields.items():
if int(getattr(metric, field)) >= int(getattr(requirements, field)):
score += weight
Expand All @@ -57,13 +89,16 @@ def handle(self, *args, **options):
if int(getattr(metric, field)) <= int(getattr(requirements, field)):
score += weight

metric.score = score
project_health_metrics.append(metric)
if metric.level_non_compliant:
score -= LEVEL_NON_COMPLIANCE_PENALTY

metric.score = max(score, 0.0)
metrics_to_update.append(metric)

if metrics_to_update:
ProjectHealthMetrics.bulk_save(
metrics_to_update,
fields=["score"],
)

ProjectHealthMetrics.bulk_save(
project_health_metrics,
fields=[
"score",
],
)
self.stdout.write(self.style.SUCCESS("Updated project health scores successfully."))
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Update OWASP project level compliance status.

This command compares locally stored project levels against
official OWASP project classification data and determines
whether each project is level-compliant.

Projects whose locally assigned level does not match the
official OWASP classification are flagged as non-compliant.
This compliance flag is later used during health score
calculation to apply penalties where appropriate.
"""

import re
from decimal import Decimal, InvalidOperation

import requests
from django.core.management.base import BaseCommand

from apps.owasp.models.project_health_metrics import ProjectHealthMetrics
from apps.owasp.utils.project_level import map_level

LEVELS_URL = (
"https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/project_levels.json"
)


def normalize_name(name: str) -> str:
"""Normalize project names for comparison."""
return re.sub(r"[^a-z0-9]+", "", name.lower().replace("owasp", ""))


class Command(BaseCommand):
"""Detect and persist OWASP project level compliance.

This command fetches official OWASP project level data,
maps it to the internal ProjectLevel enum, and updates
project health metrics to indicate whether a project's
stored level matches the official classification.
"""

help = "Detect and flag OWASP project level non-compliance."

def handle(self, *args, **options) -> None:
"""Execute project level compliance detection.

For each project health metric entry, this method
determines the expected project level based on
official OWASP data and marks the project as
level non-compliant when a mismatch is detected.
"""
try:
response = requests.get(LEVELS_URL, timeout=15)
response.raise_for_status()
official_data = response.json()
except (requests.RequestException, ValueError) as exc:
self.stderr.write(self.style.ERROR(f"Failed to fetch official project levels: {exc}"))
return

by_repo: dict[str, Decimal] = {}
by_name: dict[str, Decimal] = {}

for item in official_data:
raw_level = item.get("level")
try:
level = Decimal(str(raw_level))
except (InvalidOperation, TypeError, ValueError):
continue

if repo := item.get("repo"):
by_repo[repo.lower()] = level

if name := item.get("name"):
by_name[normalize_name(name)] = level

metrics = ProjectHealthMetrics.get_latest_health_metrics().select_related("project")
updated_metrics = []

for metric in metrics:
project = metric.project
official_level = None

if project.owasp_url:
slug = project.owasp_url.rstrip("/").split("/")[-1].lower()
official_level = by_repo.get(slug)

if official_level is None:
official_level = by_name.get(normalize_name(project.name))

if official_level is None:
continue

expected_level = map_level(official_level)
if expected_level is None:
continue

metric.level_non_compliant = project.level != expected_level
updated_metrics.append(metric)

updated_count = len(updated_metrics)

if updated_metrics:
ProjectHealthMetrics.bulk_save(
updated_metrics,
fields=["level_non_compliant"],
)

self.stdout.write(
self.style.SUCCESS(f"Updated level compliance for {updated_count} projects.")
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated by Django on 2026-01-14

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("owasp", "0069_alter_project_contribution_data_and_more"),
]

operations = [
migrations.AddField(
model_name="projecthealthmetrics",
name="level_non_compliant",
field=models.BooleanField(
default=False,
verbose_name="Is level non-compliant",
help_text="True when local project level differs from official OWASP level.",
),
),
]
5 changes: 5 additions & 0 deletions backend/apps/owasp/models/project_health_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class Meta:
is_leader_requirements_compliant = models.BooleanField(
verbose_name="Is leader requirements compliant", default=False
)
level_non_compliant = models.BooleanField(
verbose_name="Is level non-compliant",
default=False,
help_text="True when local project level differs from official OWASP level.",
)
last_released_at = models.DateTimeField(verbose_name="Last released at", blank=True, null=True)
last_committed_at = models.DateTimeField(
verbose_name="Last committed at", blank=True, null=True
Expand Down
45 changes: 45 additions & 0 deletions backend/apps/owasp/utils/project_level.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Utilities for mapping OWASP official numeric project levels.

This module acts as a translation layer between the OWASP
project level definitions published upstream and the
Nest internal project classification system.
"""

from decimal import Decimal, InvalidOperation
from typing import cast

from apps.owasp.models.enums.project import ProjectLevel

_LEVEL_MAP = {
Decimal(4): ProjectLevel.FLAGSHIP,
Decimal("3.5"): ProjectLevel.FLAGSHIP,
Decimal(3): ProjectLevel.PRODUCTION,
Decimal(2): ProjectLevel.INCUBATOR,
Decimal(1): ProjectLevel.LAB,
Decimal(0): ProjectLevel.OTHER,
}


def map_level(level: Decimal) -> ProjectLevel | None:
"""Map an OWASP official numeric project level to ProjectLevel.

Args:
level (Decimal): The numeric project level provided by OWASP.

Returns:
ProjectLevel | None: The mapped ProjectLevel value if valid,
otherwise None for unsupported or invalid levels.

"""
try:
parsed_level = Decimal(str(level))
except (InvalidOperation, TypeError, ValueError):
return None

if not parsed_level.is_finite():
return None

if parsed_level < 0:
return None

return cast("ProjectLevel | None", _LEVEL_MAP.get(parsed_level))
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,18 @@ def exists(self):
with (
mock.patch.object(Project, "active_projects", mock_active_projects),
mock.patch("builtins.print") as mock_print,
mock.patch(
"apps.owasp.management.commands.owasp_aggregate_projects.call_command"
) as mock_call_command,
):
command.handle(offset=offset)

if offset == 0:
mock_call_command.assert_any_call("owasp_update_project_level_compliance")
mock_call_command.assert_any_call("owasp_update_project_health_scores")
else:
mock_call_command.assert_not_called()

assert mock_bulk_save.called
assert mock_print.call_count == projects - offset

Expand Down
Loading