Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ sync-data: \

test-backend:
@DOCKER_BUILDKIT=1 docker build \
--cache-from nest-test-backend \
$$(docker image inspect nest-test-backend >/dev/null 2>&1 && echo '--cache-from nest-test-backend') \
-f backend/docker/Dockerfile.test backend \
-t nest-test-backend
@docker run \
Expand Down
4 changes: 4 additions & 0 deletions backend/apps/owasp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ owasp-aggregate-projects:
@echo "Aggregating OWASP projects"
@CMD="python manage.py owasp_aggregate_projects" $(MAKE) exec-backend-command

owasp-check-project-level-compliance:
@echo "Checking OWASP project level compliance"
@CMD="python manage.py owasp_check_project_level_compliance" $(MAKE) exec-backend-command

owasp-create-project-metadata-file:
@echo "Generating metadata"
@CMD="python manage.py owasp_create_project_metadata_file $(entity_key)" $(MAKE) exec-backend-command
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""A command to check OWASP project level compliance against the official source of truth."""

import logging
from urllib.request import urlopen

from django.core.management.base import BaseCommand

from apps.owasp.models.project import Project

logger = logging.getLogger(__name__)

# Official OWASP project levels source
PROJECT_LEVELS_URL = "https://raw.githubusercontent.com/OWASP/www-projectchapter-example/main/assets/project_levels.json"


class Command(BaseCommand):
help = "Check OWASP project level compliance against official project_levels.json."

def add_arguments(self, parser) -> None:
"""Add command-line arguments to the parser.

Args:
parser (argparse.ArgumentParser): The argument parser instance.

"""
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="Perform a dry run without updating the database.",
)

def fetch_official_project_levels(self) -> dict[str, str]:
"""Fetch and parse the official project levels JSON.

Returns:
dict: Mapping of project keys to their official levels.

"""
try:
with urlopen(PROJECT_LEVELS_URL, timeout=30) as response: # noqa: S310
import json

data = json.loads(response.read().decode("utf-8"))
self.stdout.write(
self.style.SUCCESS(
f"✓ Fetched {len(data)} project levels from official source"
)
)
return data
except Exception as e:
self.stdout.write(self.style.ERROR(f"✗ Failed to fetch project levels: {e}"))
logger.exception("Failed to fetch project levels from %s", PROJECT_LEVELS_URL)
return {}

def normalize_level(self, level: str) -> str:
"""Normalize a level string to match ProjectLevel enum values.

Args:
level: The level string to normalize.

Returns:
str: Normalized level string.

"""
level_mapping = {
"incubator": "incubator",
"lab": "lab",
"production": "production",
"flagship": "flagship",
"labs": "lab", # Handle plural form
"incubators": "incubator", # Handle plural form
"flagships": "flagship", # Handle plural form
}
normalized = level.lower().strip()
return level_mapping.get(normalized, "other")

def handle(self, *args, **options) -> None:
"""Handle the command execution."""
dry_run = options["dry_run"]

self.stdout.write(self.style.WARNING("Starting project level compliance check..."))

if dry_run:
self.stdout.write(self.style.NOTICE("🔍 DRY RUN MODE - No changes will be saved"))

# Fetch official project levels
official_levels = self.fetch_official_project_levels()
if not official_levels:
self.stdout.write(
self.style.ERROR("✗ Could not fetch official project levels. Aborting.")
)
return

# Get all active projects
projects = Project.active_projects.all()
total_projects = projects.count()

projects_to_update = []
compliant_count = 0
non_compliant_count = 0
not_in_official_list_count = 0

self.stdout.write(
self.style.NOTICE(f"\n📊 Checking {total_projects} active projects...\n")
)

for project in projects:
# Extract project key (remove 'www-project-' prefix if present)
project_key = project.key.replace("www-project-", "").lower()

# Check if project exists in official list
if project_key not in official_levels:
# Project not in official list - mark as non-compliant
if project.is_level_compliant:
self.stdout.write(
self.style.WARNING(
f"⚠ {project.name} ({project.key}): Not found in official list"
)
)
project.is_level_compliant = False
projects_to_update.append(project)
non_compliant_count += 1
else:
non_compliant_count += 1
not_in_official_list_count += 1
continue

# Get official level and normalize it
official_level = self.normalize_level(official_levels[project_key])
local_level = project.level.lower()

# Compare levels
if official_level != local_level:
# Levels don't match - mark as non-compliant
if project.is_level_compliant:
self.stdout.write(
self.style.WARNING(
f"⚠ {project.name} ({project.key}): "
f"Level mismatch - Local: {local_level}, Official: {official_level}"
)
)
logger.warning(
"Level mismatch for project %s: local=%s, official=%s",
project.key,
local_level,
official_level,
)
project.is_level_compliant = False
projects_to_update.append(project)
non_compliant_count += 1
else:
non_compliant_count += 1
else:
# Levels match - mark as compliant
if not project.is_level_compliant:
self.stdout.write(
self.style.SUCCESS(
f"✓ {project.name} ({project.key}): Now compliant ({local_level})"
)
)
project.is_level_compliant = True
projects_to_update.append(project)
compliant_count += 1

# Update database
if projects_to_update and not dry_run:
Project.bulk_save(projects_to_update, fields=["is_level_compliant"])
self.stdout.write(
self.style.SUCCESS(f"\n✓ Updated {len(projects_to_update)} projects in database")
)
elif projects_to_update and dry_run:
self.stdout.write(
self.style.NOTICE(
f"\n🔍 Would update {len(projects_to_update)} projects (dry run)"
)
)

# Print summary
self.stdout.write("\n" + "=" * 70)
self.stdout.write(self.style.SUCCESS("\n📈 COMPLIANCE SUMMARY:\n"))
self.stdout.write(f" Total Projects: {total_projects}")
self.stdout.write(self.style.SUCCESS(f" ✓ Compliant: {compliant_count}"))
self.stdout.write(
self.style.WARNING(f" ⚠ Non-Compliant: {non_compliant_count}")
)
self.stdout.write(
self.style.ERROR(f" - Not in official list: {not_in_official_list_count}")
)
mismatch_count = non_compliant_count - not_in_official_list_count
self.stdout.write(self.style.ERROR(f" - Level mismatch: {mismatch_count}"))
self.stdout.write(f" Changes Applied: {len(projects_to_update)}")
self.stdout.write("=" * 70 + "\n")

if non_compliant_count > 0:
self.stdout.write(
self.style.WARNING(
f"\n⚠ {non_compliant_count} non-compliant projects detected. "
"These projects will receive a score penalty."
)
)

self.stdout.write(self.style.SUCCESS("\n✓ Project level compliance check completed."))
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def handle(self, *args, **options):
"unanswered_issues_count": 6.0,
"unassigned_issues_count": 6.0,
}
# Weight for level compliance penalty (deducted if non-compliant)
level_compliance_weight = 8.0

project_health_metrics = []
project_health_requirements = {
Expand All @@ -57,6 +59,11 @@ def handle(self, *args, **options):
if int(getattr(metric, field)) <= int(getattr(requirements, field)):
score += weight

# Apply level compliance bonus or penalty
if metric.project.is_level_compliant:
score += level_compliance_weight
# Non-compliant projects don't get the compliance bonus (penalty)

metric.score = score
project_health_metrics.append(metric)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Generated migration

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("owasp", "0052_remove_entitymember_owasp_entit_member__6e516f_idx_and_more"),
]

operations = [
migrations.AddField(
model_name="project",
name="is_level_compliant",
field=models.BooleanField(
default=True,
verbose_name="Is level compliant",
help_text="Indicates if the project level matches the official OWASP project_levels.json",
),
),
]
5 changes: 5 additions & 0 deletions backend/apps/owasp/models/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ class Meta:

custom_tags = models.JSONField(verbose_name="Custom tags", default=list, blank=True)
track_issues = models.BooleanField(verbose_name="Track issues", default=True)
is_level_compliant = models.BooleanField(
verbose_name="Is level compliant",
default=True,
help_text="Indicates if the project level matches the official OWASP project_levels.json",
)

# GKs.
members = GenericRelation("owasp.EntityMember")
Expand Down
Loading