Skip to content

Commit

Permalink
Merge pull request #1016 from GitGuardian/mmillet/-/refactor_secret_i…
Browse files Browse the repository at this point in the history
…gnoring

chore: move secret ignoring logic inside the scanner
  • Loading branch information
agateau-gg authored Nov 28, 2024
2 parents 40bcd32 + 4fb68d1 commit bb6af65
Show file tree
Hide file tree
Showing 18 changed files with 325 additions and 494 deletions.
44 changes: 4 additions & 40 deletions ggshield/core/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import math
import operator
import re
from typing import Dict, Iterable, List, Optional, Pattern, Set
from typing import Dict, Iterable, List, Pattern, Set

from click import UsageError
from pygitguardian.models import Match, PolicyBreak, ScanResult
from pygitguardian.models import Match, PolicyBreak

from ggshield.core.types import IgnoredMatch

Expand All @@ -22,12 +22,12 @@
MAXIMUM_CENSOR_LENGTH = 60


def is_ignored(
def is_in_ignored_matches(
policy_break: PolicyBreak,
matches_ignore: Iterable[IgnoredMatch],
) -> bool:
"""
is_ignored checks if a occurrence is ignored.
is_in_ignored_matches checks if a occurrence is ignored.
There are 2 ways of ignoring a occurrence:
- matching the occurrence sha
- matching one of the match.match values
Expand All @@ -47,42 +47,6 @@ def is_ignored(
return False


def remove_ignored_from_result(
scan_result: ScanResult, matches_ignore: Iterable[IgnoredMatch]
) -> None:
"""
remove_ignored removes occurrences from a Scan Result based on a sha
made from its matches.
:param scan_result: ScanResult to filter
:param matches_ignore: match SHAs or plaintext matches to filter out
"""

scan_result.policy_breaks = [
policy_break
for policy_break in scan_result.policy_breaks
if not is_ignored(policy_break, matches_ignore)
]

scan_result.policy_break_count = len(scan_result.policy_breaks)


def remove_results_from_ignore_detectors(
scan_result: ScanResult,
ignored_detectors: Optional[Set[str]] = None,
) -> None:
if not ignored_detectors:
return

scan_result.policy_breaks = [
policy_break
for policy_break in scan_result.policy_breaks
if policy_break.break_type not in ignored_detectors
]

scan_result.policy_break_count = len(scan_result.policy_breaks)


def get_ignore_sha(policy_break: PolicyBreak) -> str:
hashable = "".join(
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,16 @@ class SecretGitLabWebUIOutputHandler(SecretOutputHandler):
use_stderr = True

def _process_scan_impl(self, scan: SecretScanCollection) -> str:
results = list(scan.get_all_results())
results = scan.get_all_results()

policy_breaks_to_report = [
policy_break for result in results for policy_break in result.policy_breaks
]

# If no secrets or no new secrets were found
if not results or (self.ignore_known_secrets and not scan.has_new_secrets):
if len(policy_breaks_to_report) == 0:
return ""

policy_breaks_to_report = []
for result in results:
if not self.ignore_known_secrets:
# Populate GL-HOOK-ERR with all policy breaks found
policy_breaks_to_report += result.scan.policy_breaks
else:
for policy_break in result.scan.policy_breaks:
known = policy_break.known_secret
# Populate GL-HOOK-ERR with only new policy breaks
if not known:
policy_breaks_to_report.append(policy_break)

# Use a set to ensure we do not report duplicate incidents.
# (can happen when the secret is present in both the old and the new version of
# the document)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def create_scan_dict(
if scan.extra_info:
scan_dict["extra_info"] = scan.extra_info

if top and scan.has_results:
if top:
scan_dict["secrets_engine_version"] = VERSIONS.secrets_engine_version

if scan.results:
Expand Down Expand Up @@ -74,7 +74,7 @@ def process_result(
"total_occurrences": 0,
"total_incidents": 0,
}
sha_dict = group_policy_breaks_by_ignore_sha(result.scan.policy_breaks)
sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks)
result_dict["total_incidents"] = len(sha_dict)

if not self.show_secrets:
Expand Down
5 changes: 1 addition & 4 deletions ggshield/verticals/secret/output/secret_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
raise NotImplementedError()

def _get_exit_code(self, scan: SecretScanCollection) -> ExitCode:
if self.ignore_known_secrets:
if scan.has_new_secrets:
return ExitCode.SCAN_FOUND_PROBLEMS
elif scan.has_secrets:
if scan.total_policy_breaks_count > 0:
return ExitCode.SCAN_FOUND_PROBLEMS
return ExitCode.SUCCESS
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _create_sarif_results(
per policy break.
"""
for result in results:
for policy_break in result.scan.policy_breaks:
for policy_break in result.policy_breaks:
yield _create_sarif_result_dict(result.url, policy_break, incident_details)


Expand Down
67 changes: 29 additions & 38 deletions ggshield/verticals/secret/output/secret_text_output_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
pluralize,
translate_validity,
)
from ggshield.verticals.secret.secret_scanner import IgnoreReason

from ..extended_match import ExtendedMatch
from ..secret_scan_collection import Result, SecretScanCollection
Expand All @@ -38,37 +39,35 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str:
scan_buf.write(secrets_engine_version())
scan_buf.write(processed_scan_results)
if not processed_scan_results:

scan_buf.write(
no_new_leak_message()
if (self.ignore_known_secrets and scan.known_secrets_count)
if self.ignore_known_secrets
else no_leak_message()
)

if self.ignore_known_secrets and scan.known_secrets_count > 0:
known_secrets_count = sum(
result.ignored_policy_breaks_count_by_reason.get(
IgnoreReason.KNOWN_SECRET, 0
)
for result in scan.get_all_results()
)
if self.ignore_known_secrets and known_secrets_count > 0:
scan_buf.write(
f"\nWarning: {scan.known_secrets_count} {pluralize('secret', scan.known_secrets_count)} ignored "
f"because {pluralize('it is', scan.known_secrets_count, 'they are')} already known by your "
f"\nWarning: {known_secrets_count} {pluralize('secret', known_secrets_count)} ignored "
f"because {pluralize('it is', known_secrets_count, 'they are')} already known by your "
f"GitGuardian dashboard and you used the `--ignore-known-secrets` option.\n"
)

if self.verbose:
scan_buf.write(self.process_scan_results(scan, True))
else:
scan_buf.write("Use `--verbose` for more details.\n")

# TODO: display that using --all-secrets will display those
return scan_buf.getvalue()

def process_scan_results(
self, scan: SecretScanCollection, show_only_known_secrets: bool = False
) -> str:
def process_scan_results(self, scan: SecretScanCollection) -> str:
"""Iterate through the scans and sub-scan results to prepare the display."""
results_buf = StringIO()
if scan.results:
current_result_buf = StringIO()
for result in scan.results.results:
current_result_buf.write(
self.process_result(result, show_only_known_secrets)
)
current_result_buf.write(self.process_result(result))
current_result_string = current_result_buf.getvalue()

# We want to show header when at least one result is not empty
Expand All @@ -79,16 +78,12 @@ def process_scan_results(

if scan.scans:
for sub_scan in scan.scans:
inner_scan_str = self.process_scan_results(
sub_scan, show_only_known_secrets
)
inner_scan_str = self.process_scan_results(sub_scan)
results_buf.write(inner_scan_str)

return results_buf.getvalue()

def process_result(
self, result: Result, show_only_known_secrets: bool = False
) -> str:
def process_result(self, result: Result) -> str:
"""
Build readable message on the found incidents.
Expand All @@ -99,32 +94,28 @@ def process_result(
"""
result_buf = StringIO()

sha_dict = group_policy_breaks_by_ignore_sha(result.scan.policy_breaks)
sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks)

if not self.show_secrets:
result.censor()

number_of_displayed_secrets = 0
for ignore_sha, policy_breaks in sha_dict.items():
known_secret = policy_breaks[0].known_secret
if (
(not known_secret and not show_only_known_secrets)
or (known_secret and show_only_known_secrets)
or not self.ignore_known_secrets
):
number_of_displayed_secrets += 1
number_of_displayed_secrets += 1

result_buf.write(
policy_break_header(policy_breaks, ignore_sha, known_secret)
result_buf.write(
policy_break_header(
policy_breaks, ignore_sha, policy_breaks[0].known_secret
)
)

result_buf.write(
leak_message_located(
flatten_policy_breaks_by_line(policy_breaks),
result.is_on_patch,
clip_long_lines=not self.verbose,
)
result_buf.write(
leak_message_located(
flatten_policy_breaks_by_line(policy_breaks),
result.is_on_patch,
clip_long_lines=not self.verbose,
)
)

file_info_line = ""
if number_of_displayed_secrets > 0:
Expand Down
Loading

0 comments on commit bb6af65

Please sign in to comment.