-
-
Notifications
You must be signed in to change notification settings - Fork 264
feature/Implementation of management command. #2073
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 13 commits
949afda
e28ebc5
038c71d
ff4df3b
d2fea99
b652e64
50127c3
4e11688
75dffda
df2f113
d85e0d5
d3bd32b
1365beb
41598c9
9199bd2
64ee6bb
e1c923b
8f4e3a4
021ec86
b7a7eac
79acaec
2748680
d5aed68
b886714
55bcf9d
1e20a06
9b3205d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| """OWASP app constants.""" | ||
|
|
||
| OWASP_ORGANIZATION_NAME = "OWASP" | ||
| OWASP_PROJECT_LEVELS_URL = "https://raw.githubusercontent.com/OWASP/owasp.github.io/main/_data/project_levels.json" | ||
divyanshu-vr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| """A command to detect and flag projects with non-compliant level assignments.""" | ||
divyanshu-vr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| import logging | ||
| import time | ||
|
|
||
| from django.core.management.base import BaseCommand, CommandError | ||
|
|
||
| from apps.owasp.models.project_health_metrics import ProjectHealthMetrics | ||
| from apps.owasp.utils.compliance_detector import detect_and_update_compliance | ||
| from apps.owasp.utils.project_level_fetcher import fetch_official_project_levels | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Detect and flag projects with non-compliant level assignments" | ||
|
|
||
| def add_arguments(self, parser): | ||
| """Add command line arguments.""" | ||
| parser.add_argument( | ||
| "--dry-run", | ||
| action="store_true", | ||
| help="Show what would be changed without making actual updates", | ||
| ) | ||
| parser.add_argument( | ||
| "--verbose", action="store_true", help="Enable verbose output for debugging" | ||
| ) | ||
| parser.add_argument( | ||
| "--timeout", | ||
| type=int, | ||
| default=30, | ||
| help="HTTP timeout for fetching project levels (default: 30 seconds)", | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| """Execute compliance detection workflow.""" | ||
| start_time = time.perf_counter() | ||
| dry_run = options["dry_run"] | ||
| verbose = options["verbose"] | ||
| timeout = options["timeout"] | ||
| # Configure logging level based on verbose flag | ||
| if verbose: | ||
| logging.getLogger("apps.owasp.utils").setLevel(logging.DEBUG) | ||
| try: | ||
| self.stdout.write("Starting OWASP project level compliance detection...") | ||
| if dry_run: | ||
| self.stdout.write("DRY RUN MODE: No changes will be made to the database") | ||
| # Step 1: Fetch official project levels | ||
| self.stdout.write("Fetching official project levels from OWASP GitHub repository...") | ||
| official_levels = fetch_official_project_levels(timeout=timeout) | ||
|
|
||
| if official_levels is None or not official_levels: | ||
| msg = "Failed to fetch official project levels or received empty payload" | ||
| self.stderr.write(msg) | ||
| raise CommandError(msg) | ||
|
|
||
| self.stdout.write( | ||
| f"Successfully fetched {len(official_levels)} official project levels" | ||
| ) | ||
| # Steps 2-4: Detect and update in one procedural call | ||
divyanshu-vr marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| self.stdout.write("Detecting and updating compliance issues...") | ||
| updated_count = detect_and_update_compliance(official_levels, dry_run=dry_run) | ||
| # Recompute a lightweight summary from latest health metrics | ||
| latest_metrics = ProjectHealthMetrics.get_latest_health_metrics() | ||
| total = len(latest_metrics) | ||
| compliant = sum(1 for m in latest_metrics if m.is_level_compliant) | ||
| non_compliant = total - compliant | ||
| compliance_rate = (compliant / total * 100) if total else 0.0 | ||
| # Step 5: Summary | ||
| execution_time = time.perf_counter() - start_time | ||
| self.stdout.write(f"\nCompliance detection completed in {execution_time:.2f}s") | ||
| self.stdout.write(f"Summary: {compliant} compliant, {non_compliant} non-compliant") | ||
| self.stdout.write(f"Compliance rate: {compliance_rate:.1f}%") | ||
| if dry_run: | ||
| self.stdout.write(f"DRY RUN: Would update {updated_count} projects") | ||
| else: | ||
| self.stdout.write(f"Updated {updated_count} projects") | ||
| # Log detailed summary for monitoring | ||
| logger.info( | ||
| "Compliance detection completed successfully", | ||
| extra={ | ||
| "execution_time": f"{execution_time:.2f}s", | ||
| "dry_run": dry_run, | ||
| "total_projects": total, | ||
| "compliant_projects": compliant, | ||
| "non_compliant_projects": non_compliant, | ||
| "compliance_rate": f"{compliance_rate:.1f}%", | ||
| }, | ||
| ) | ||
| except Exception as e: | ||
| execution_time = time.perf_counter() - start_time | ||
| error_msg = f"Compliance detection failed after {execution_time:.2f}s: {e!s}" | ||
|
|
||
| logger.exception( | ||
| "Compliance detection failed", | ||
| extra={ | ||
| "execution_time_s": round(execution_time, 2), | ||
| "error": e.__class__.__name__, | ||
| }, | ||
| ) | ||
| raise CommandError(error_msg) from e | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| # Generated by Django 5.2.5 on 2025-08-12 21:00 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
|
|
||
| dependencies = [ | ||
| ('owasp', '0046_merge_0045_badge_0045_project_audience'), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name='projecthealthmetrics', | ||
| name='is_level_compliant', | ||
| field=models.BooleanField(default=True, help_text="Whether the project's local level matches the official OWASP level", verbose_name='Is project level compliant'), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| # Generated by Django 5.2.5 on 2025-08-14 15:17 | ||
|
|
||
| from django.db import migrations, models | ||
|
|
||
|
|
||
| class Migration(migrations.Migration): | ||
| dependencies = [ | ||
| ("owasp", "0047_add_is_level_compliant_field"), | ||
| ] | ||
|
|
||
| operations = [ | ||
| migrations.AddField( | ||
| model_name="projecthealthrequirements", | ||
| name="compliance_penalty_weight", | ||
| field=models.FloatField( | ||
| default=10.0, | ||
| help_text="Percentage penalty applied to non-compliant projects (0-100)", | ||
| verbose_name="Compliance penalty weight (%)", | ||
| ), | ||
| ), | ||
| migrations.AddConstraint( | ||
| model_name="projecthealthrequirements", | ||
| constraint=models.CheckConstraint( | ||
| name="owasp_compliance_penalty_weight_0_100", | ||
| check=models.Q(compliance_penalty_weight__gte=0.0) | ||
| & models.Q(compliance_penalty_weight__lte=100.0), | ||
| ), | ||
| ), | ||
| ] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,11 @@ class Meta: | |
| unassigned_issues_count = models.PositiveIntegerField( | ||
| verbose_name="Unassigned issues", default=0 | ||
| ) | ||
| compliance_penalty_weight = models.FloatField( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's out of the feature scope. We have the scoring system already in place and there is no need to introduce additional fields. |
||
| verbose_name="Compliance penalty weight (%)", | ||
| default=10.0, | ||
| help_text="Percentage penalty applied to non-compliant projects (0-100)", | ||
| ) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Enforce 0–100 bounds for percentage with validators and a DB check constraint Without validation, invalid values (e.g., negative or >100) can slip into the DB and break scoring semantics. Add field-level validators and a DB-level CheckConstraint. Apply this diff within this field: - compliance_penalty_weight = models.FloatField(
- verbose_name="Compliance penalty weight (%)",
- default=10.0,
- help_text="Percentage penalty applied to non-compliant projects (0-100)",
- )
+ compliance_penalty_weight = models.FloatField(
+ verbose_name="Compliance penalty weight (%)",
+ default=10.0,
+ help_text="Percentage penalty applied to non-compliant projects (0-100)",
+ validators=[MinValueValidator(0.0), MaxValueValidator(100.0)],
+ )Additionally, add the import and a DB-level constraint (update migration accordingly): # at top of file
from django.core.validators import MaxValueValidator, MinValueValidator# extend Meta in this model
class Meta:
db_table = "owasp_project_health_requirements"
verbose_name_plural = "Project Health Requirements"
ordering = ["level"]
constraints = [
models.CheckConstraint(
name="owasp_compliance_penalty_weight_0_100",
check=models.Q(compliance_penalty_weight__gte=0.0)
& models.Q(compliance_penalty_weight__lte=100.0),
)
] |
||
|
|
||
| def __str__(self) -> str: | ||
| """Project health requirements human readable representation.""" | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| """OWASP utilities.""" | ||
|
|
||
| from .compliance_detector import detect_and_update_compliance | ||
| from .project_level_fetcher import fetch_official_project_levels | ||
|
|
||
| __all__ = ["detect_and_update_compliance", "fetch_official_project_levels"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| """Service for detecting project level compliance issues.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
|
|
||
| from django.db import transaction | ||
|
|
||
| from apps.owasp.models.project_health_metrics import ProjectHealthMetrics | ||
|
|
||
| logger: logging.Logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def detect_and_update_compliance( | ||
| official_levels: dict[str, str], | ||
| dry_run: bool = False, | ||
| ) -> int: | ||
| """Compare official levels with local levels and update compliance status. | ||
|
|
||
| Args: | ||
| official_levels (dict[str, str]): Dict of project names to official levels. | ||
| dry_run (bool): If True, preview changes without writing. | ||
|
|
||
| Returns: | ||
| int: Number of projects that would be/were updated. | ||
| """ | ||
| logger.info("Starting project level compliance detection") | ||
| # Normalize official levels by stripping whitespace and normalizing case | ||
| normalized_official_levels = { | ||
| k.strip().lower(): v.strip().lower() | ||
| for k, v in official_levels.items() | ||
| } | ||
| # Get all active projects | ||
| # Latest metrics already filter to active projects (see get_latest_health_metrics) | ||
| with transaction.atomic(): | ||
| # Get latest health metrics for all projects | ||
| latest_metrics = ProjectHealthMetrics.get_latest_health_metrics().select_related("project") | ||
| metrics_to_update = [] | ||
| for metric in latest_metrics: | ||
| project = metric.project | ||
| project_name = project.name | ||
| local_level = str(project.level).strip().lower() | ||
|
|
||
| # Compare official level with local level using normalized values | ||
| normalized_project_name = project_name.strip().lower() | ||
| if normalized_project_name in normalized_official_levels: | ||
| normalized_official_level = normalized_official_levels[normalized_project_name] | ||
| is_compliant = local_level == normalized_official_level | ||
|
|
||
| # Update compliance status if it has changed | ||
| if metric.is_level_compliant != is_compliant: | ||
| metric.is_level_compliant = is_compliant | ||
| metrics_to_update.append(metric) | ||
| logger.info( | ||
| "Project compliance status changed", | ||
| extra={ | ||
| "project": project_name, | ||
| "local_level": local_level, | ||
| "official_level": normalized_official_level, | ||
| "is_compliant": is_compliant, | ||
| }, | ||
| ) | ||
| # Project not found in official data - mark as non-compliant | ||
| elif metric.is_level_compliant: | ||
| metric.is_level_compliant = False | ||
| metrics_to_update.append(metric) | ||
| logger.warning( | ||
| "Project not found in official data, marking as non-compliant", | ||
| extra={"project": project_name, "local_level": local_level}, | ||
| ) | ||
| # Bulk update compliance status (or preview in dry-run) | ||
| if metrics_to_update: | ||
| if dry_run: | ||
| logger.info( | ||
| "DRY RUN: would update compliance status for projects", | ||
| extra={"updated_count": len(metrics_to_update)}, | ||
| ) | ||
| else: | ||
| ProjectHealthMetrics.objects.bulk_update( | ||
| metrics_to_update, | ||
| ["is_level_compliant"], | ||
| batch_size=100, | ||
| ) | ||
| logger.info( | ||
| "Updated compliance status for projects", | ||
| extra={"updated_count": len(metrics_to_update)}, | ||
| ) | ||
| else: | ||
| logger.info("No compliance status changes needed") | ||
|
|
||
| return len(metrics_to_update) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| """Service for fetching OWASP project levels from GitHub repository.""" | ||
|
|
||
| import logging | ||
|
|
||
| import requests | ||
| from requests.exceptions import RequestException | ||
|
|
||
| from apps.owasp.constants import OWASP_PROJECT_LEVELS_URL | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| def fetch_official_project_levels(timeout: int = 30) -> dict[str, str] | None: | ||
| """Fetch project levels from OWASP GitHub repository. | ||
|
|
||
| Args: | ||
| timeout: HTTP request timeout in seconds | ||
|
|
||
| Returns: | ||
| Dict mapping project names to their official levels, or None if fetch fails | ||
|
|
||
| """ | ||
| try: | ||
| response = requests.get( | ||
| OWASP_PROJECT_LEVELS_URL, | ||
| timeout=timeout, | ||
| headers={"Accept": "application/json"}, | ||
| ) | ||
| response.raise_for_status() | ||
| data = response.json() | ||
| if not isinstance(data, list): | ||
| logger.exception( | ||
| "Invalid project levels data format", | ||
| extra={"expected": "list", "got": type(data).__name__}, | ||
| ) | ||
| return None | ||
|
|
||
| # Convert the list to a dict mapping project names to their levels | ||
| project_levels = {} | ||
| for entry in data: | ||
| if not isinstance(entry, dict): | ||
| continue | ||
| project_name = entry.get("name") | ||
| level = entry.get("level") | ||
| if ( | ||
| isinstance(project_name, str) | ||
| and isinstance(level, (str, int, float)) | ||
| and project_name.strip() | ||
| ): | ||
| project_levels[project_name.strip()] = str(level) | ||
|
|
||
| return project_levels | ||
|
|
||
| except (RequestException, ValueError) as e: | ||
| logger.exception( | ||
| "Failed to fetch project levels", | ||
| extra={"url": OWASP_PROJECT_LEVELS_URL, "error": str(e)}, | ||
| ) | ||
| return None |
Uh oh!
There was an error while loading. Please reload this page.