Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

summarizing multiple similar findings into problems #11432

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dryrunsecurity.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ sensitiveCodepaths:
- 'dojo/metrics/*.py'
- 'dojo/note_type/*.py'
- 'dojo/notes/*.py'
- 'dojo/problem/*.py'
- 'dojo/product/*.py'
- 'dojo/product_type/*.py'
- 'dojo/reports/*.py'
Expand Down
30 changes: 30 additions & 0 deletions dojo/db_migrations/0219_problem_finding_problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Generated by Django 5.0.8 on 2024-12-23 22:35

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dojo', '0218_system_settings_enforce_verified_status_and_more'),
]

operations = [
migrations.CreateModel(
name='Problem',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(help_text='A short name or title for the problem.', max_length=255, verbose_name='Name')),
('problem_id', models.CharField(help_text='Problem identifier. This field is used to uniquely identify the problem.', max_length=255, unique=True, verbose_name='Problem ID')),
('created_at', models.DateTimeField(auto_now_add=True, help_text='Timestamp when this problem was created.', verbose_name='Created At')),
('updated_at', models.DateTimeField(auto_now=True, help_text='Timestamp when this problem was last updated.', verbose_name='Updated At')),
('severity', models.CharField(choices=[('Critical', 'Critical'), ('High', 'High'), ('Medium', 'Medium'), ('Low', 'Low'), ('Info', 'Info')], help_text='The severity level of this problem.', max_length=50, verbose_name='Severity')),
],
),
migrations.AddField(
model_name='finding',
name='problem',
field=models.ForeignKey(blank=True, help_text='The problem this finding is related to.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='findings', to='dojo.problem', verbose_name='Problem'),
),
]
36 changes: 36 additions & 0 deletions dojo/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,6 +2257,35 @@ class Meta:
def __str__(self):
return f"{self.finding.id}: {self.action}"

class Problem(models.Model):
name = models.CharField(max_length=255,
verbose_name=_("Name"),
help_text=_("A short name or title for the problem."))
problem_id = models.CharField(max_length=255,
unique=True,
null=True,
blank=True,
verbose_name=_("Problem ID"),
help_text=_("Problem identifier. This field is used to uniquely identify the problem."))
created_at = models.DateTimeField(auto_now_add=True,
verbose_name=_("Created At"),
help_text=_("Timestamp when this problem was created."))
updated_at = models.DateTimeField(auto_now=True,
verbose_name=_("Updated At"),
help_text=_("Timestamp when this problem was last updated."))
severity = models.CharField(max_length=50,
choices=[
('Critical', _("Critical")),
('High', _("High")),
('Medium', _("Medium")),
('Low', _("Low")),
('Info', _("Info")),
],
verbose_name=_("Severity"),
help_text=_("The severity level of this problem."))
def __str__(self):
return self.name


class Finding(models.Model):
title = models.CharField(max_length=511,
Expand All @@ -2283,6 +2312,13 @@ class Finding(models.Model):
blank=False,
verbose_name=_("Vulnerability Id"),
help_text=_("An id of a vulnerability in a security advisory associated with this finding. Can be a Common Vulnerabilities and Exposures (CVE) or from other sources."))
problem = models.ForeignKey(Problem,
null=True,
blank=True,
on_delete=models.SET_NULL,
related_name='findings',
verbose_name=_("Problem"),
help_text=_("The problem this finding is related to."))
epss_score = models.FloatField(default=None, null=True, blank=True,
verbose_name=_("EPSS Score"),
help_text=_("EPSS score for the CVE. Describes how likely it is the vulnerability will be exploited in the next 30 days."),
Expand Down
Empty file added dojo/problem/__init__.py
Empty file.
127 changes: 127 additions & 0 deletions dojo/problem/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import json
import os
import requests

from django.conf import settings

from dojo.models import Problem, Finding

import logging
logger = logging.getLogger(__name__)

MEDIA_ROOT = os.getenv('DD_MEDIA_ROOT', '/app/media')
CACHED_JSON_FILE = os.path.join(MEDIA_ROOT, 'cached_disambiguator.json')

SEVERITY_ORDER = {
'Critical': 5,
'High': 4,
'Medium': 3,
'Low': 2,
'Info': 1
}

def validate_json(data):
if not isinstance(data, dict):
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return False
for key, value in data.items():
if not isinstance(key, str) or not isinstance(value, list):
return False
if not all(isinstance(item, str) for item in value):
return False
return True

def download_json(json_url):
response = requests.get(json_url, timeout=5, verify=False)
response.raise_for_status()
LeoOMaia marked this conversation as resolved.
Show resolved Hide resolved
return response.json()

def load_cached_json():
if os.path.exists(CACHED_JSON_FILE):
try:
with open(CACHED_JSON_FILE, 'r') as f:
data = json.load(f)
if validate_json(data):
return data
else:
logger.warning('Cached JSON failed validation.')
except json.JSONDecodeError:
logger.error('Error decoding JSON from cache.')
except Exception as e:
logger.error(f'Unexpected error loading JSON from cache: {e}')
else:
logger.info('Cached JSON file does not exist.')
return None

def save_json_to_cache(data):
logger.info('Saving disambiguator JSON to cache')
with open(CACHED_JSON_FILE, 'w') as f:
json.dump(data, f)

def mapping_script_problem_id(mappings_json_findings):
script_to_problem_mapping = {
script_id: key
for key, script_ids in mappings_json_findings.items()
for script_id in script_ids
}
return script_to_problem_mapping

def load_json(check_cache=True):
try:
if check_cache:
cached_data = load_cached_json()
if cached_data and validate_json(cached_data):
return mapping_script_problem_id(cached_data)

if settings.PROBLEM_MAPPINGS_JSON_URL:
data = download_json(settings.PROBLEM_MAPPINGS_JSON_URL)
if validate_json(data):
save_json_to_cache(data)
return mapping_script_problem_id(data)
else:
logger.error('No disambiguator JSON URL provided.')
except requests.RequestException as e:
logger.error('HTTP error while loading JSON: %s', e)
except json.JSONDecodeError as e:
logger.error('JSON decoding error: %s', e)
except Exception as e:
logger.error('Unexpected error: %s', e)
return {}

def find_or_create_problem(finding, script_to_problem_mapping):
problem_id = script_to_problem_mapping.get(finding.vuln_id_from_tool)
if problem_id:
return _get_or_update_problem(finding, problem_id)

# if the script_id is not in the mapping, create a new one
return _get_or_create_problem_by_script_id(finding)

def _get_or_update_problem(finding, problem_id):
problem = Problem.objects.filter(problem_id=problem_id).first()
if problem:
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[problem.severity]:
_update_problem(problem, finding.title, finding.severity)
return problem

return Problem.objects.create(
name=finding.title,
problem_id=problem_id,
severity=finding.severity
)

def _get_or_create_problem_by_script_id(finding):
related_finding = Finding.objects.filter(vuln_id_from_tool=finding.vuln_id_from_tool).first()
if related_finding and related_finding.problem:
problem = related_finding.problem
if SEVERITY_ORDER[finding.severity] > SEVERITY_ORDER[problem.severity]:
_update_problem(problem, finding.title, finding.severity)
return problem

return Problem.objects.create(
name=finding.title,
severity=finding.severity
)

def _update_problem(problem, name, severity):
problem.name = name
problem.severity = severity
problem.save()
17 changes: 17 additions & 0 deletions dojo/problem/update_mappings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import json
import os
import requests

from dojo.celery import app
from dojo.decorators import dojo_async_task
from dojo.problem.helper import load_json

import logging
logger = logging.getLogger(__name__)


@dojo_async_task
@app.task
def daily_cache_update(**kwargs):
logger.info("Starting daily cache update")
load_json(check_cache=False)
27 changes: 27 additions & 0 deletions dojo/problem/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django.urls import re_path

from dojo.problem import views

urlpatterns = [
# Listing operations
re_path(
r"^problems/all$",
views.ListProblems.as_view(),
name="all_problems",
),
re_path(
r"^problems/open$",
views.ListOpenProblems.as_view(),
name="open_problems",
),
re_path(
r"^problems/closed$",
views.ListClosedProblems.as_view(),
name="closed_problems",
),
re_path(
r"^problems/(?P<problem_id>\d+)/findings$",
views.ProblemFindings.as_view(),
name="problem_findings",
)
]
120 changes: 120 additions & 0 deletions dojo/problem/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from collections import OrderedDict

from django.http import HttpRequest
from django.shortcuts import get_object_or_404, render
from django.views import View
from django.db.models import Count, Q
from django.core.paginator import Paginator

from dojo.utils import add_breadcrumb
from dojo.models import Problem, Finding, Endpoint

import logging
logger = logging.getLogger(__name__)

class ListProblems(View):
filter_name = "All"

def get_template(self):
return "dojo/problems_list.html"

def get_engagement_id(self):
return getattr(self, "engagement_id", None)

def get_problem_id(self):
return getattr(self, "problem_id", None)

def add_breadcrumbs(self, request: HttpRequest, context: dict):
if "endpoints" in request.GET:
endpoint_ids = request.GET.getlist("endpoints", [])
if len(endpoint_ids) == 1 and endpoint_ids[0]:
endpoint_id = endpoint_ids[0]
endpoint = get_object_or_404(Endpoint, id=endpoint_id)
context["filter_name"] = "Vulnerable Endpoints"
context["custom_breadcrumb"] = OrderedDict([
("Endpoints", reverse("vulnerable_endpoints")),
(endpoint, reverse("view_endpoint", args=(endpoint.id,))),
])
elif not self.get_engagement_id() and not self.get_problem_id():
add_breadcrumb(title="Problems", top_level=not len(request.GET), request=request)

return request, context

def get_problems(self, request: HttpRequest):
queryset = Problem.objects.all().annotate(
findings_count=Count('findings'),
total_script_ids=Count('findings__vuln_id_from_tool', distinct=True)
).distinct()
order_field = request.GET.get('o')
return queryset.order_by(order_field) if order_field else queryset.order_by("id")

def paginate_queryset(self, queryset, request: HttpRequest):
page_size = request.GET.get('page_size', 25) # Default is 25
paginator = Paginator(queryset, page_size)
page_number = request.GET.get('page')
return paginator.get_page(page_number)

def get(self, request: HttpRequest):
problems = self.get_problems(request)
paginated_problems = self.paginate_queryset(problems, request)

context = {
"filter_name": self.filter_name,
"problems": paginated_problems,
}

request, context = self.add_breadcrumbs(request, context)
return render(request, self.get_template(), context)


class ListOpenProblems(ListProblems):
filter_name = "Open"

def get_problems(self, request: HttpRequest):
queryset = Problem.objects.filter(
findings__active=True
).annotate(
findings_count=Count('findings'),
total_script_ids=Count('findings__vuln_id_from_tool', distinct=True)
).distinct()
order_field = request.GET.get('o')
return queryset.order_by(order_field) if order_field else queryset.order_by("id")


class ListClosedProblems(ListProblems):
filter_name = "Closed"

def get_problems(self, request: HttpRequest):
queryset = Problem.objects.annotate(
active_findings=Count('findings', filter=Q(findings__active=True))
).filter(active_findings=0).annotate(
findings_count=Count('findings'),
total_script_ids=Count('findings__vuln_id_from_tool', distinct=True)
).distinct()
order_field = request.GET.get('o')
return queryset.order_by(order_field) if order_field else queryset.order_by("id")



class ProblemFindings(ListProblems):
def get_template(self):
return "dojo/problem_findings.html"

def get_findings(self, request: HttpRequest):
problem = Problem.objects.get(pk=self.problem_id)
queryset = problem.findings.all()
order_field = request.GET.get('o')
return problem.name, queryset.order_by(order_field) if order_field else queryset.order_by("id")

def get(self, request: HttpRequest, problem_id: int):
self.problem_id = problem_id
problem_name, findings = self.get_findings(request)
paginated_findings = self.paginate_queryset(findings, request)

context = {
"problem": problem_name,
"findings": paginated_findings,
}

request, context = self.add_breadcrumbs(request, context)
return render(request, self.get_template(), context)
Loading
Loading