Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
949afda
WIP: Implementation of management command.
SUPERGAMERDIV Aug 14, 2025
e28ebc5
fix project_levels_url
divyanshu-vr Aug 14, 2025
038c71d
avoid string literal in raise
divyanshu-vr Aug 14, 2025
ff4df3b
wrap db updates in transaction
divyanshu-vr Aug 14, 2025
d2fea99
use logger.exception
divyanshu-vr Aug 14, 2025
b652e64
Apply suggestions from code review
divyanshu-vr Aug 14, 2025
50127c3
Apply suggestions from code review
divyanshu-vr Aug 14, 2025
4e11688
Apply suggestions from code review
divyanshu-vr Aug 14, 2025
75dffda
coderabbit suggestions fixed
SUPERGAMERDIV Aug 14, 2025
df2f113
Merge branch 'feature/ProjectLevelComplianceDetection' of https://git…
SUPERGAMERDIV Aug 14, 2025
d85e0d5
conflict fixes
SUPERGAMERDIV Aug 14, 2025
d3bd32b
pre-commit fixes
SUPERGAMERDIV Aug 14, 2025
1365beb
fixes and added CronJob Scheduling
SUPERGAMERDIV Aug 18, 2025
41598c9
coderabbit fixes
SUPERGAMERDIV Aug 18, 2025
9199bd2
Merge branch 'OWASP:main' into feature/ProjectLevelComplianceDetection
divyanshu-vr Aug 21, 2025
64ee6bb
made changes asked.
SUPERGAMERDIV Aug 21, 2025
e1c923b
Merge branch 'feature/ProjectLevelComplianceDetection' of https://git…
SUPERGAMERDIV Aug 21, 2025
8f4e3a4
Update backend/apps/owasp/management/commands/owasp_update_project_he…
divyanshu-vr Aug 21, 2025
021ec86
fixed sonarqube issues
SUPERGAMERDIV Aug 23, 2025
b7a7eac
Update __init__.py
divyanshu-vr Aug 24, 2025
79acaec
Update __init__.py
divyanshu-vr Aug 24, 2025
2748680
sonar fixes
SUPERGAMERDIV Aug 26, 2025
d5aed68
Merge branch 'feature/ProjectLevelComplianceDetection' of https://git…
SUPERGAMERDIV Aug 26, 2025
b886714
fixed make check errors
SUPERGAMERDIV Aug 28, 2025
55bcf9d
fixed sonarqube and precommit errors
SUPERGAMERDIV Aug 28, 2025
1e20a06
made coderabbit suggestions
SUPERGAMERDIV Aug 28, 2025
9b3205d
test fixes
SUPERGAMERDIV Aug 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/apps/owasp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ owasp-update-events:
owasp-update-sponsors:
@echo "Getting OWASP sponsors data"
@CMD="python manage.py owasp_update_sponsors" $(MAKE) exec-backend-command

owasp-detect-project-level-compliance:
@echo "Detecting OWASP project level compliance"
@CMD="python manage.py owasp_detect_project_level_compliance" $(MAKE) exec-backend-command
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""A command to detect and report project level compliance status."""

import logging
from io import StringIO

from django.core.management.base import BaseCommand

from apps.owasp.models.project import Project
from apps.owasp.models.project_health_metrics import ProjectHealthMetrics

logger = logging.getLogger(__name__)


class Command(BaseCommand):
"""Command to detect and report project level compliance status."""

help = "Detect and report projects with non-compliant level assignments"

def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose output showing all projects",
)

def handle(self, *args, **options):
"""Execute compliance detection and reporting."""
verbose = options["verbose"]

self.stdout.write("Analyzing project level compliance status...")

# Get all active projects
active_projects = Project.objects.filter(is_active=True).select_related()

compliant_projects = []
non_compliant_projects = []

for project in active_projects:
if project.is_level_compliant:
compliant_projects.append(project)
if verbose:
self.stdout.write(
f"✓ {project.name}: {project.level} (matches official)"
)
else:
non_compliant_projects.append(project)
self.stdout.write(
self.style.WARNING(
f"✗ {project.name}: Local={project.level}, Official={project.project_level_official}"
)
)

# Summary statistics
total_projects = len(active_projects)
compliant_count = len(compliant_projects)
non_compliant_count = len(non_compliant_projects)
compliance_rate = (compliant_count / total_projects * 100) if total_projects else 0.0

self.stdout.write("\n" + "="*60)
self.stdout.write("PROJECT LEVEL COMPLIANCE SUMMARY")
self.stdout.write("="*60)
self.stdout.write(f"Total active projects: {total_projects}")
self.stdout.write(f"Compliant projects: {compliant_count}")
self.stdout.write(f"Non-compliant projects: {non_compliant_count}")
self.stdout.write(f"Compliance rate: {compliance_rate:.1f}%")

if non_compliant_count > 0:
self.stdout.write(f"\n{self.style.WARNING('⚠ WARNING: Found ' + str(non_compliant_count) + ' non-compliant projects')}")
self.stdout.write("These projects will receive score penalties in the next health score update.")
else:
self.stdout.write(f"\n{self.style.SUCCESS('✓ All projects are level compliant!')}")

# Log summary for monitoring
logger.info(
"Project level compliance analysis completed",
extra={
"total_projects": total_projects,
"compliant_projects": compliant_count,
"non_compliant_projects": non_compliant_count,
"compliance_rate": f"{compliance_rate:.1f}%",
},
)

# Check if official levels are populated
default_level = Project._meta.get_field('project_level_official').default
projects_without_official_level = sum(
1 for project in active_projects
if project.project_level_official == default_level
)

if projects_without_official_level > 0:
self.stdout.write(
f"\n{self.style.NOTICE('ℹ INFO: ' + str(projects_without_official_level) + ' projects have default official levels')}"
)
self.stdout.write("Run 'owasp_update_project_health_metrics' to sync official levels from OWASP GitHub.")
Original file line number Diff line number Diff line change
@@ -1,15 +1,146 @@
"""A command to update OWASP project health metrics."""

import logging

import requests
from django.core.management.base import BaseCommand
from requests.exceptions import RequestException

from apps.owasp.models.project import Project
from apps.owasp.models.project_health_metrics import ProjectHealthMetrics

logger = logging.getLogger(__name__)

OWASP_PROJECT_LEVELS_URL = "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/project_levels.json"


class Command(BaseCommand):
help = "Update OWASP project health metrics."

def add_arguments(self, parser):
"""Add command line arguments."""
parser.add_argument(
"--skip-official-levels",
action="store_true",
help="Skip fetching official project levels from OWASP GitHub repository",
)
parser.add_argument(
"--timeout",
type=int,
default=30,
help="HTTP timeout for fetching project levels (default: 30 seconds)",
)

def fetch_official_project_levels(self, timeout: int = 30) -> dict[str, str] | None:
"""Fetch project levels from OWASP GitHub repository.

Args:
timeout: HTTP request timeout in seconds

Returns:
Dict mapping project names to their official levels, or None if fetch fails
"""
try:
response = requests.get(
OWASP_PROJECT_LEVELS_URL,
timeout=timeout,
headers={"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
if not isinstance(data, list):
logger.exception(
"Invalid project levels data format",
extra={"expected": "list", "got": type(data).__name__},
)
return None

# Convert the list to a dict mapping project names to their levels
project_levels = {}
for entry in data:
if not isinstance(entry, dict):
continue
project_name = entry.get("name")
level = entry.get("level")
if (
isinstance(project_name, str)
and isinstance(level, (str, int, float))
and project_name.strip()
):
project_levels[project_name.strip()] = str(level)

return project_levels

except (RequestException, ValueError) as e:
logger.exception(
"Failed to fetch project levels",
extra={"url": OWASP_PROJECT_LEVELS_URL, "error": str(e)},
)
return None

def update_official_levels(self, official_levels: dict[str, str]) -> int:
"""Update official levels for projects.

Args:
official_levels: Dict mapping project names to their official levels

Returns:
Number of projects updated
"""
updated_count = 0
projects_to_update = []

# Normalize official levels by stripping whitespace and normalizing case
normalized_official_levels = {
k.strip().lower(): v.strip().lower()
for k, v in official_levels.items()
}

for project in Project.objects.filter(is_active=True):
normalized_project_name = project.name.strip().lower()
if normalized_project_name in normalized_official_levels:
official_level = normalized_official_levels[normalized_project_name]
# Map string levels to enum values
level_mapping = {
"incubator": "incubator",
"lab": "lab",
"production": "production",
"flagship": "flagship",
"2": "incubator",
"3": "lab",
"3.5": "production",
"4": "flagship",
}
mapped_level = level_mapping.get(official_level, "other")

if project.project_level_official != mapped_level:
project.project_level_official = mapped_level
projects_to_update.append(project)
updated_count += 1

if projects_to_update:
Project.bulk_save(projects_to_update, fields=["project_level_official"])
self.stdout.write(f"Updated official levels for {updated_count} projects")
else:
self.stdout.write("No official level updates needed")

return updated_count

def handle(self, *args, **options):
skip_official_levels = options["skip_official_levels"]
timeout = options["timeout"]

# Step 1: Fetch and update official project levels (unless skipped)
if not skip_official_levels:
self.stdout.write("Fetching official project levels from OWASP GitHub repository...")
official_levels = self.fetch_official_project_levels(timeout=timeout)
if official_levels:
self.stdout.write(f"Successfully fetched {len(official_levels)} official project levels")
self.update_official_levels(official_levels)
else:
self.stdout.write(self.style.WARNING("Failed to fetch official project levels, continuing without updates"))

# Step 2: Update project health metrics
metric_project_field_mapping = {
"contributors_count": "contributors_count",
"created_at": "created_at",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,33 @@ def handle(self, *args, **options):
if int(getattr(metric, field)) <= int(getattr(requirements, field)):
score += weight

metric.score = score
# Fetch requirements for this project level, skip if missing
requirements = project_health_requirements.get(metric.project.level)
if requirements is None:
self.stdout.write(
self.style.WARNING(
f"Missing ProjectHealthRequirements for level '{metric.project.level}' — "
f"skipping scoring for {metric.project.name}"
)
)
continue

# Apply compliance penalty if project is not level compliant
if not metric.project.is_level_compliant:
penalty_percentage = float(getattr(requirements, "compliance_penalty_weight", 0.0))
# Clamp to [0, 100]
penalty_percentage = max(0.0, min(100.0, penalty_percentage))
penalty_amount = score * (penalty_percentage / 100.0)
score = max(0.0, score - penalty_amount)
self.stdout.write(
self.style.WARNING(
f"Applied {penalty_percentage}% compliance penalty to {metric.project.name} "
f"(penalty: {penalty_amount:.2f}, final score: {score:.2f}) "
f"[Local: {metric.project.level}, Official: {metric.project.project_level_official}]"
)
)
# Ensure score stays within bounds (0-100)
metric.score = max(0.0, min(100.0, score))
project_health_metrics.append(metric)

ProjectHealthMetrics.bulk_save(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.2.5 on 2025-08-12 21:00

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('owasp', '0046_merge_0045_badge_0045_project_audience'),
]

operations = [
migrations.AddField(
model_name='projecthealthmetrics',
name='is_level_compliant',
field=models.BooleanField(default=True, help_text="Whether the project's local level matches the official OWASP level", verbose_name='Is project level compliant'),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.2.5 on 2025-08-14 15:17

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("owasp", "0047_add_is_level_compliant_field"),
]

operations = [
migrations.AddField(
model_name="projecthealthrequirements",
name="compliance_penalty_weight",
field=models.FloatField(
default=10.0,
help_text="Percentage penalty applied to non-compliant projects (0-100)",
verbose_name="Compliance penalty weight (%)",
),
),
migrations.AddConstraint(
model_name="projecthealthrequirements",
constraint=models.CheckConstraint(
name="owasp_compliance_penalty_weight_0_100",
check=models.Q(compliance_penalty_weight__gte=0.0)
& models.Q(compliance_penalty_weight__lte=100.0),
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Generated by Django 5.2.5 on 2025-08-18 12:29

import django.core.validators
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('owasp', '0048_add_compliance_penalty_weight'),
]

operations = [
migrations.AlterField(
model_name='projecthealthrequirements',
name='compliance_penalty_weight',
field=models.FloatField(default=10.0, help_text='Percentage penalty applied to non-compliant projects (0-100)', validators=[django.core.validators.MinValueValidator(0.0), django.core.validators.MaxValueValidator(100.0)], verbose_name='Compliance penalty weight (%)'),
),
]
12 changes: 12 additions & 0 deletions backend/apps/owasp/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ class Meta:
default=ProjectLevel.OTHER,
)
level_raw = models.CharField(verbose_name="Level raw", max_length=50, default="")
project_level_official = models.CharField(
verbose_name="Official Level",
max_length=20,
choices=ProjectLevel.choices,
default=ProjectLevel.OTHER,
help_text="Official project level from OWASP GitHub repository",
)

type = models.CharField(
verbose_name="Type",
Expand Down Expand Up @@ -151,6 +158,11 @@ def is_leader_requirements_compliant(self) -> bool:
# Have multiple Project Leaders who are not all employed by the same company.
return self.leaders_count > 1

@property
def is_level_compliant(self) -> bool:
"""Indicate whether project level matches the official OWASP level."""
return self.level == self.project_level_official

@property
def is_tool_type(self) -> bool:
"""Indicate whether project has TOOL type."""
Expand Down
Loading