diff --git a/ggshield/core/filter.py b/ggshield/core/filter.py index 265dd49110..109d5ce347 100644 --- a/ggshield/core/filter.py +++ b/ggshield/core/filter.py @@ -2,10 +2,10 @@ import math import operator import re -from typing import Dict, Iterable, List, Optional, Pattern, Set +from typing import Dict, Iterable, List, Pattern, Set from click import UsageError -from pygitguardian.models import Match, PolicyBreak, ScanResult +from pygitguardian.models import Match, PolicyBreak from ggshield.core.types import IgnoredMatch @@ -22,12 +22,12 @@ MAXIMUM_CENSOR_LENGTH = 60 -def is_ignored( +def is_in_ignored_matches( policy_break: PolicyBreak, matches_ignore: Iterable[IgnoredMatch], ) -> bool: """ - is_ignored checks if a occurrence is ignored. + is_in_ignored_matches checks if a occurrence is ignored. There are 2 ways of ignoring a occurrence: - matching the occurrence sha - matching one of the match.match values @@ -47,42 +47,6 @@ def is_ignored( return False -def remove_ignored_from_result( - scan_result: ScanResult, matches_ignore: Iterable[IgnoredMatch] -) -> None: - """ - remove_ignored removes occurrences from a Scan Result based on a sha - made from its matches. - - :param scan_result: ScanResult to filter - :param matches_ignore: match SHAs or plaintext matches to filter out - """ - - scan_result.policy_breaks = [ - policy_break - for policy_break in scan_result.policy_breaks - if not is_ignored(policy_break, matches_ignore) - ] - - scan_result.policy_break_count = len(scan_result.policy_breaks) - - -def remove_results_from_ignore_detectors( - scan_result: ScanResult, - ignored_detectors: Optional[Set[str]] = None, -) -> None: - if not ignored_detectors: - return - - scan_result.policy_breaks = [ - policy_break - for policy_break in scan_result.policy_breaks - if policy_break.break_type not in ignored_detectors - ] - - scan_result.policy_break_count = len(scan_result.policy_breaks) - - def get_ignore_sha(policy_break: PolicyBreak) -> str: hashable = "".join( [ diff --git a/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py b/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py index 32a42468a3..a200aafb65 100644 --- a/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py +++ b/ggshield/verticals/secret/output/secret_gitlab_webui_output_handler.py @@ -33,23 +33,16 @@ class SecretGitLabWebUIOutputHandler(SecretOutputHandler): use_stderr = True def _process_scan_impl(self, scan: SecretScanCollection) -> str: - results = list(scan.get_all_results()) + results = scan.get_all_results() + + policy_breaks_to_report = [ + policy_break for result in results for policy_break in result.policy_breaks + ] + # If no secrets or no new secrets were found - if not results or (self.ignore_known_secrets and not scan.has_new_secrets): + if len(policy_breaks_to_report) == 0: return "" - policy_breaks_to_report = [] - for result in results: - if not self.ignore_known_secrets: - # Populate GL-HOOK-ERR with all policy breaks found - policy_breaks_to_report += result.scan.policy_breaks - else: - for policy_break in result.scan.policy_breaks: - known = policy_break.known_secret - # Populate GL-HOOK-ERR with only new policy breaks - if not known: - policy_breaks_to_report.append(policy_break) - # Use a set to ensure we do not report duplicate incidents. # (can happen when the secret is present in both the old and the new version of # the document) diff --git a/ggshield/verticals/secret/output/secret_json_output_handler.py b/ggshield/verticals/secret/output/secret_json_output_handler.py index 8baf0c681e..fe78ce67c2 100644 --- a/ggshield/verticals/secret/output/secret_json_output_handler.py +++ b/ggshield/verticals/secret/output/secret_json_output_handler.py @@ -27,7 +27,7 @@ def create_scan_dict( if scan.extra_info: scan_dict["extra_info"] = scan.extra_info - if top and scan.has_results: + if top: scan_dict["secrets_engine_version"] = VERSIONS.secrets_engine_version if scan.results: @@ -74,7 +74,7 @@ def process_result( "total_occurrences": 0, "total_incidents": 0, } - sha_dict = group_policy_breaks_by_ignore_sha(result.scan.policy_breaks) + sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks) result_dict["total_incidents"] = len(sha_dict) if not self.show_secrets: diff --git a/ggshield/verticals/secret/output/secret_output_handler.py b/ggshield/verticals/secret/output/secret_output_handler.py index 9e60df006b..8a1d009ea9 100644 --- a/ggshield/verticals/secret/output/secret_output_handler.py +++ b/ggshield/verticals/secret/output/secret_output_handler.py @@ -56,9 +56,6 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str: raise NotImplementedError() def _get_exit_code(self, scan: SecretScanCollection) -> ExitCode: - if self.ignore_known_secrets: - if scan.has_new_secrets: - return ExitCode.SCAN_FOUND_PROBLEMS - elif scan.has_secrets: + if scan.total_policy_breaks_count > 0: return ExitCode.SCAN_FOUND_PROBLEMS return ExitCode.SUCCESS diff --git a/ggshield/verticals/secret/output/secret_sarif_output_handler.py b/ggshield/verticals/secret/output/secret_sarif_output_handler.py index 5237ea277c..b5ef79faba 100644 --- a/ggshield/verticals/secret/output/secret_sarif_output_handler.py +++ b/ggshield/verticals/secret/output/secret_sarif_output_handler.py @@ -60,7 +60,7 @@ def _create_sarif_results( per policy break. """ for result in results: - for policy_break in result.scan.policy_breaks: + for policy_break in result.policy_breaks: yield _create_sarif_result_dict(result.url, policy_break, incident_details) diff --git a/ggshield/verticals/secret/output/secret_text_output_handler.py b/ggshield/verticals/secret/output/secret_text_output_handler.py index 8d5e08c212..73d2b8dfc6 100644 --- a/ggshield/verticals/secret/output/secret_text_output_handler.py +++ b/ggshield/verticals/secret/output/secret_text_output_handler.py @@ -15,6 +15,7 @@ pluralize, translate_validity, ) +from ggshield.verticals.secret.secret_scanner import IgnoreReason from ..extended_match import ExtendedMatch from ..secret_scan_collection import Result, SecretScanCollection @@ -38,37 +39,35 @@ def _process_scan_impl(self, scan: SecretScanCollection) -> str: scan_buf.write(secrets_engine_version()) scan_buf.write(processed_scan_results) if not processed_scan_results: + scan_buf.write( no_new_leak_message() - if (self.ignore_known_secrets and scan.known_secrets_count) + if self.ignore_known_secrets else no_leak_message() ) - if self.ignore_known_secrets and scan.known_secrets_count > 0: + known_secrets_count = sum( + result.ignored_policy_breaks_count_by_reason.get( + IgnoreReason.KNOWN_SECRET, 0 + ) + for result in scan.get_all_results() + ) + if self.ignore_known_secrets and known_secrets_count > 0: scan_buf.write( - f"\nWarning: {scan.known_secrets_count} {pluralize('secret', scan.known_secrets_count)} ignored " - f"because {pluralize('it is', scan.known_secrets_count, 'they are')} already known by your " + f"\nWarning: {known_secrets_count} {pluralize('secret', known_secrets_count)} ignored " + f"because {pluralize('it is', known_secrets_count, 'they are')} already known by your " f"GitGuardian dashboard and you used the `--ignore-known-secrets` option.\n" ) - - if self.verbose: - scan_buf.write(self.process_scan_results(scan, True)) - else: - scan_buf.write("Use `--verbose` for more details.\n") - + # TODO: display that using --all-secrets will display those return scan_buf.getvalue() - def process_scan_results( - self, scan: SecretScanCollection, show_only_known_secrets: bool = False - ) -> str: + def process_scan_results(self, scan: SecretScanCollection) -> str: """Iterate through the scans and sub-scan results to prepare the display.""" results_buf = StringIO() if scan.results: current_result_buf = StringIO() for result in scan.results.results: - current_result_buf.write( - self.process_result(result, show_only_known_secrets) - ) + current_result_buf.write(self.process_result(result)) current_result_string = current_result_buf.getvalue() # We want to show header when at least one result is not empty @@ -79,16 +78,12 @@ def process_scan_results( if scan.scans: for sub_scan in scan.scans: - inner_scan_str = self.process_scan_results( - sub_scan, show_only_known_secrets - ) + inner_scan_str = self.process_scan_results(sub_scan) results_buf.write(inner_scan_str) return results_buf.getvalue() - def process_result( - self, result: Result, show_only_known_secrets: bool = False - ) -> str: + def process_result(self, result: Result) -> str: """ Build readable message on the found incidents. @@ -99,32 +94,28 @@ def process_result( """ result_buf = StringIO() - sha_dict = group_policy_breaks_by_ignore_sha(result.scan.policy_breaks) + sha_dict = group_policy_breaks_by_ignore_sha(result.policy_breaks) if not self.show_secrets: result.censor() number_of_displayed_secrets = 0 for ignore_sha, policy_breaks in sha_dict.items(): - known_secret = policy_breaks[0].known_secret - if ( - (not known_secret and not show_only_known_secrets) - or (known_secret and show_only_known_secrets) - or not self.ignore_known_secrets - ): - number_of_displayed_secrets += 1 + number_of_displayed_secrets += 1 - result_buf.write( - policy_break_header(policy_breaks, ignore_sha, known_secret) + result_buf.write( + policy_break_header( + policy_breaks, ignore_sha, policy_breaks[0].known_secret ) + ) - result_buf.write( - leak_message_located( - flatten_policy_breaks_by_line(policy_breaks), - result.is_on_patch, - clip_long_lines=not self.verbose, - ) + result_buf.write( + leak_message_located( + flatten_policy_breaks_by_line(policy_breaks), + result.is_on_patch, + clip_long_lines=not self.verbose, ) + ) file_info_line = "" if number_of_displayed_secrets > 0: diff --git a/ggshield/verticals/secret/secret_scan_collection.py b/ggshield/verticals/secret/secret_scan_collection.py index 89d064136b..b342459a2c 100644 --- a/ggshield/verticals/secret/secret_scan_collection.py +++ b/ggshield/verticals/secret/secret_scan_collection.py @@ -1,18 +1,35 @@ from dataclasses import dataclass, field +from enum import Enum from pathlib import Path -from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Tuple, Union, cast +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Tuple, + Union, + cast, +) from pygitguardian import GGClient -from pygitguardian.models import Detail, Match, ScanResult, SecretIncident +from pygitguardian.models import Detail, Match, PolicyBreak, ScanResult, SecretIncident from ggshield.core.errors import UnexpectedError, handle_api_error -from ggshield.core.filter import group_policy_breaks_by_ignore_sha from ggshield.core.lines import Line, get_lines_from_content from ggshield.core.scan.scannable import Scannable from ggshield.utils.git_shell import Filemode from ggshield.verticals.secret.extended_match import ExtendedMatch +class IgnoreReason(Enum): + IGNORED_MATCH = "ignored_match" + IGNORED_DETECTOR = "ignored_detector" + KNOWN_SECRET = "known_secret" + + class Result: """ Return model for a scan which zips the information @@ -23,16 +40,18 @@ class Result: filemode: Filemode path: Path url: str - scan: ScanResult # Result of content scan + policy_breaks: List[PolicyBreak] + ignored_policy_breaks_count_by_reason: Dict[IgnoreReason, int] def __init__(self, file: Scannable, scan: ScanResult): self.filename = file.filename self.filemode = file.filemode self.path = file.path self.url = file.url - self.scan = scan + self.policy_breaks = scan.policy_breaks lines = get_lines_from_content(file.content, self.filemode) self.enrich_matches(lines) + self.ignored_policy_breaks_count_by_reason = {} def __eq__(self, other: Any) -> bool: if not isinstance(other, Result): @@ -42,7 +61,7 @@ def __eq__(self, other: Any) -> bool: and self.filemode == other.filemode and self.path == other.path and self.url == other.url - and self.scan == other.scan + and self.policy_breaks == other.policy_breaks ) @property @@ -52,7 +71,7 @@ def is_on_patch(self) -> bool: def enrich_matches(self, lines: List[Line]) -> None: if len(lines) == 0: raise UnexpectedError("Parsing of scan result failed.") - for policy_break in self.scan.policy_breaks: + for policy_break in self.policy_breaks: policy_break.matches = cast( List[Match], [ @@ -62,13 +81,29 @@ def enrich_matches(self, lines: List[Line]) -> None: ) def censor(self) -> None: - for policy_break in self.scan.policy_breaks: + for policy_break in self.policy_breaks: for extended_match in policy_break.matches: cast(ExtendedMatch, extended_match).censor() @property def has_policy_breaks(self) -> bool: - return self.scan.has_policy_breaks + return len(self.policy_breaks) > 0 + + def apply_ignore_function( + self, reason: IgnoreReason, ignore_function: Callable[[PolicyBreak], bool] + ): + assert ( + reason not in self.ignored_policy_breaks_count_by_reason + ), f"Ignore was already computed for {IgnoreReason}" + to_keep = [] + ignored_count = 0 + for policy_break in self.policy_breaks: + if ignore_function(policy_break): + ignored_count += 1 + else: + to_keep.append(policy_break) + self.policy_breaks = to_keep + self.ignored_policy_breaks_count_by_reason[reason] = ignored_count class Error(NamedTuple): @@ -131,18 +166,9 @@ def __init__( self.optional_header = optional_header self.extra_info = extra_info - ( - self.known_secrets_count, - self.new_secrets_count, - ) = self._get_known_new_secrets_count() - - @property - def has_new_secrets(self) -> bool: - return self.new_secrets_count > 0 - - @property - def has_secrets(self) -> bool: - return (self.new_secrets_count + self.known_secrets_count) > 0 + self.total_policy_breaks_count = sum( + len(result.policy_breaks) for result in self.get_all_results() + ) @property def scans_with_results(self) -> List["SecretScanCollection"]: @@ -150,28 +176,6 @@ def scans_with_results(self) -> List["SecretScanCollection"]: return [scan for scan in self.scans if scan.results] return [] - @property - def has_results(self) -> bool: - return bool(self.results and self.results.results) - - def _get_known_new_secrets_count(self) -> Tuple[int, int]: - policy_breaks = [] - for result in self.get_all_results(): - for policy_break in result.scan.policy_breaks: - policy_breaks.append(policy_break) - - known_secrets_count = 0 - new_secrets_count = 0 - sha_dict = group_policy_breaks_by_ignore_sha(policy_breaks) - - for ignore_sha, policy_breaks in sha_dict.items(): - if policy_breaks[0].known_secret: - known_secrets_count += 1 - else: - new_secrets_count += 1 - - return known_secrets_count, new_secrets_count - def get_all_results(self) -> Iterable[Result]: """Returns an iterable on all results and sub-scan results""" if self.results: @@ -184,7 +188,7 @@ def get_all_results(self) -> Iterable[Result]: def get_incident_details(self, client: GGClient) -> Dict[str, SecretIncident]: incident_details: dict[str, SecretIncident] = {} for result in self.get_all_results(): - for policy_break in result.scan.policy_breaks: + for policy_break in result.policy_breaks: url = policy_break.incident_url if url and url not in incident_details: incident_id = int(url.split("/")[-1]) diff --git a/ggshield/verticals/secret/secret_scanner.py b/ggshield/verticals/secret/secret_scanner.py index 93430559b7..0f3aa1f48a 100644 --- a/ggshield/verticals/secret/secret_scanner.py +++ b/ggshield/verticals/secret/secret_scanner.py @@ -15,15 +15,12 @@ from ggshield.core.config.user_config import SecretConfig from ggshield.core.constants import MAX_WORKERS from ggshield.core.errors import MissingScopesError, UnexpectedError, handle_api_error -from ggshield.core.filter import ( - remove_ignored_from_result, - remove_results_from_ignore_detectors, -) +from ggshield.core.filter import is_in_ignored_matches from ggshield.core.scan import DecodeError, ScanContext, Scannable from ggshield.core.text_utils import pluralize from ggshield.core.ui.scanner_ui import ScannerUI -from .secret_scan_collection import Error, Result, Results +from .secret_scan_collection import Error, IgnoreReason, Result, Results # GitGuardian API does not accept paths longer than this @@ -58,6 +55,7 @@ def __init__( self.client = client self.cache = cache + self.secret_config = secret_config self.ignored_matches = secret_config.ignored_matches or [] self.ignored_detectors = secret_config.ignored_detectors self.headers = scan_context.get_http_headers() @@ -215,18 +213,32 @@ def _collect_results( continue assert isinstance(scan, MultiScanResult) - for file, scanned in zip(chunk, scan.scan_results): - remove_ignored_from_result(scanned, self.ignored_matches) - remove_results_from_ignore_detectors(scanned, self.ignored_detectors) - if scanned.has_policy_breaks: - for policy_break in scanned.policy_breaks: - self.cache.add_found_policy_break(policy_break, file.filename) - results.append( - Result( - file=file, - scan=scanned, - ) + for file, scan_result in zip(chunk, scan.scan_results): + result = Result( + file=file, + scan=scan_result, + ) + if not scan_result.has_policy_breaks: + continue + result.apply_ignore_function( + IgnoreReason.IGNORED_MATCH, + lambda policy_break: is_in_ignored_matches( + policy_break, self.ignored_matches + ), + ) + result.apply_ignore_function( + IgnoreReason.IGNORED_DETECTOR, + lambda policy_break: policy_break.break_type + in self.ignored_detectors, + ) + if self.secret_config.ignore_known_secrets: + result.apply_ignore_function( + IgnoreReason.KNOWN_SECRET, + lambda policy_break: policy_break.known_secret, ) + for policy_break in result.policy_breaks: + self.cache.add_found_policy_break(policy_break, file.filename) + results.append(result) self.cache.save() return Results(results=results, errors=errors) diff --git a/tests/unit/cmd/test_ignore.py b/tests/unit/cmd/test_ignore.py index 06ff773f03..9c9c97c98c 100644 --- a/tests/unit/cmd/test_ignore.py +++ b/tests/unit/cmd/test_ignore.py @@ -148,7 +148,7 @@ def test_cache_catches_nothing(client, isolated_fs): ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) - assert results.results == [] + assert sum(len(result.policy_breaks) for result in results.results) == 0 assert config.user_config.secret.ignored_matches == FOUND_SECRETS assert cache.last_found_secrets == [] diff --git a/tests/unit/core/test_filter.py b/tests/unit/core/test_filter.py index bcf3293dee..239d0e01a1 100644 --- a/tests/unit/core/test_filter.py +++ b/tests/unit/core/test_filter.py @@ -6,18 +6,17 @@ from pygitguardian.models import Match, PolicyBreak, ScanResult from snapshottest import Snapshot -from ggshield.core.filter import ( - censor_match, - get_ignore_sha, - remove_ignored_from_result, -) +from ggshield.core.filter import censor_match, get_ignore_sha, is_in_ignored_matches +from ggshield.core.scan.scannable import StringScannable from ggshield.core.types import IgnoredMatch +from ggshield.verticals.secret.secret_scan_collection import Result from tests.unit.conftest import ( _MULTILINE_SECRET, _MULTIPLE_SECRETS_SCAN_RESULT, + _ONE_LINE_AND_MULTILINE_PATCH_CONTENT, _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, + _SIMPLE_SECRET_PATCH, _SIMPLE_SECRET_PATCH_SCAN_RESULT, - _SIMPLE_SECRET_WITH_FILENAME_PATCH_SCAN_RESULT, ) @@ -90,39 +89,38 @@ def test_get_ignore_sha( @pytest.mark.parametrize( - "scan_result, ignores, final_len", + ("content", "scan_result", "ignores", "final_len"), [ pytest.param( + _SIMPLE_SECRET_PATCH, _SIMPLE_SECRET_PATCH_SCAN_RESULT, [], _SIMPLE_SECRET_PATCH_SCAN_RESULT.policy_break_count, id="_SIMPLE_SECRET_PATCH_SCAN_RESULT-no remove, not all policies", ), pytest.param( - _SIMPLE_SECRET_WITH_FILENAME_PATCH_SCAN_RESULT, - [], - _SIMPLE_SECRET_WITH_FILENAME_PATCH_SCAN_RESULT.policy_break_count - 1, - id="_SIMPLE_SECRET_PATCH_WITH_FILENAME_SCAN_RESULT-not all policies", - ), - pytest.param( + _SIMPLE_SECRET_PATCH, _SIMPLE_SECRET_PATCH_SCAN_RESULT, ["2b5840babacb6f089ddcce1fe5a56b803f8b1f636c6f44cdbf14b0c77a194c93"], 0, id="_SIMPLE_SECRET_PATCH_SCAN_RESULT-remove by sha", ), pytest.param( + _SIMPLE_SECRET_PATCH, _SIMPLE_SECRET_PATCH_SCAN_RESULT, ["368ac3edf9e850d1c0ff9d6c526496f8237ddf91"], 0, id="_SIMPLE_SECRET_PATCH_SCAN_RESULT-remove by plaintext", ), pytest.param( + _ONE_LINE_AND_MULTILINE_PATCH_CONTENT, _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, ["1945f4a0c42abb19c1a420ddd09b4b4681249a3057c427b95f794b18595e7ffa"], 2, id="_MULTI_SECRET_ONE_LINE_PATCH_SCAN_RESULT-remove one by sha", ), pytest.param( + _ONE_LINE_AND_MULTILINE_PATCH_CONTENT, _ONE_LINE_AND_MULTILINE_PATCH_SCAN_RESULT, [ "060bf63de122848f5efa122fe6cea504aae3b24cea393d887fdefa1529c6a02e", @@ -134,14 +132,19 @@ def test_get_ignore_sha( ], ) def test_remove_ignores( - scan_result: ScanResult, ignores: Iterable, final_len: int + content: str, scan_result: ScanResult, ignores: Iterable, final_len: int ) -> None: - copy_result = copy.deepcopy(scan_result) - ignored_matches = [IgnoredMatch(name="", match=x) for x in ignores] - remove_ignored_from_result(copy_result, ignored_matches) + result = Result( + file=StringScannable(url="localhost", content=content), + scan=copy.deepcopy(scan_result), + ) - assert len(copy_result.policy_breaks) == final_len - assert copy_result.policy_break_count == final_len + ignored_matches = [IgnoredMatch(name="", match=x) for x in ignores] + result.apply_ignore_function( + "ignored_matches", + lambda policy_break: is_in_ignored_matches(policy_break, ignored_matches), + ) + assert len(result.policy_breaks) == final_len @pytest.mark.parametrize( diff --git a/tests/unit/utils/test_os.py b/tests/unit/utils/test_os.py index 11651d7413..96ebc6df8d 100644 --- a/tests/unit/utils/test_os.py +++ b/tests/unit/utils/test_os.py @@ -1,9 +1,16 @@ +import os import sys from typing import AnyStr, Tuple import pytest -from ggshield.utils.os import getenv_bool, getenv_float, getenv_int, parse_os_release +from ggshield.utils.os import ( + cd, + getenv_bool, + getenv_float, + getenv_int, + parse_os_release, +) @pytest.mark.skipif( @@ -88,3 +95,11 @@ def test_getenv_bool(monkeypatch, env_value, default, expected): else: monkeypatch.delenv(key, raising=False) assert getenv_bool(key, default) == expected + + +def test_cd_context_manager(tmpdir): + prev = os.getcwd() + assert prev != tmpdir + with cd(tmpdir): + assert os.getcwd() == tmpdir + assert os.getcwd() == prev diff --git a/tests/unit/verticals/secret/output/test_gitlab_webui_output.py b/tests/unit/verticals/secret/output/test_gitlab_webui_output.py index 0040efb2e5..bbf745a450 100644 --- a/tests/unit/verticals/secret/output/test_gitlab_webui_output.py +++ b/tests/unit/verticals/secret/output/test_gitlab_webui_output.py @@ -1,16 +1,11 @@ -from copy import deepcopy - -import pytest from pygitguardian.models import Match, PolicyBreak from ggshield.core.config.user_config import SecretConfig -from ggshield.core.scan import StringScannable -from ggshield.verticals.secret import Result, Results, SecretScanCollection +from ggshield.verticals.secret import Results, SecretScanCollection from ggshield.verticals.secret.output.secret_gitlab_webui_output_handler import ( SecretGitLabWebUIOutputHandler, format_policy_break, ) -from tests.unit.conftest import _ONE_LINE_AND_MULTILINE_PATCH_CONTENT, TWO_POLICY_BREAKS def test_format_policy_break(): @@ -33,18 +28,14 @@ def test_format_policy_break(): assert match.match not in out -@pytest.mark.parametrize("ignore_known_secrets", [True, False]) -def test_gitlab_web_ui_output_no_secrets(ignore_known_secrets): +def test_gitlab_web_ui_output_no_secrets(): """ GIVEN a content with no secret WHEN GitLabWebUIOutputHandler manipulates the corresponding scan THEN the error message is empty as expected and the status code is zero """ - secret_config = SecretConfig( - show_secrets=True, ignore_known_secrets=ignore_known_secrets - ) output_handler = SecretGitLabWebUIOutputHandler( - secret_config=secret_config, verbose=False + secret_config=SecretConfig(), verbose=False ) scan = SecretScanCollection( id="scan", @@ -64,71 +55,3 @@ def test_gitlab_web_ui_output_no_secrets(ignore_known_secrets): assert exit_code == 0 assert error_msg == "" - - -@pytest.mark.parametrize("ignore_known_secrets", [True, False]) -@pytest.mark.parametrize( - "secrets_types", - ["only_new_secrets", "only_known_secrets", "mixed_secrets"], -) -def test_gitlab_web_ui_output_ignore_known_secrets(secrets_types, ignore_known_secrets): - """ - GIVEN a content with secrets - WHEN GitLabWebUIOutputHandler manipulates the corresponding scan - THEN the error message warns about secrets or about only new secrets depending - on the ignore_known_secrets parameter - """ - result: Result = Result( - StringScannable(content=_ONE_LINE_AND_MULTILINE_PATCH_CONTENT, url="leak.txt"), - scan=deepcopy(TWO_POLICY_BREAKS), # 2 policy breaks - ) - - all_policy_breaks = result.scan.policy_breaks - - if secrets_types == "only_known_secrets": - known_policy_breaks = all_policy_breaks - new_policy_breaks = [] - elif secrets_types == "mixed_secrets": - # set only first policy break as known - known_policy_breaks = all_policy_breaks[:1] - new_policy_breaks = all_policy_breaks[1:] - else: - known_policy_breaks = [] - new_policy_breaks = all_policy_breaks - - for index, policy_break in enumerate(known_policy_breaks): - policy_break.known_secret = True - policy_break.incident_url = ( - f"https://dashboard.gitguardian.com/workspace/1/incidents/{index}" - ) - - secret_config = SecretConfig( - show_secrets=True, ignore_known_secrets=ignore_known_secrets - ) - output_handler = SecretGitLabWebUIOutputHandler( - secret_config=secret_config, verbose=False - ) - output = output_handler._process_scan_impl( - SecretScanCollection( - id="outer_scan", - type="outer_scan", - results=Results(results=[], errors=[]), - scans=[ - SecretScanCollection( - id="scan", - type="test", - results=Results( - results=[result], - errors=[], - ), - ) - ], - ) - ) - if ignore_known_secrets: - if len(new_policy_breaks): - assert f"ggshield found {len(new_policy_breaks)} new" in output - else: - assert output == "" - else: - assert f"ggshield found {len(all_policy_breaks)}" in output diff --git a/tests/unit/verticals/secret/output/test_json_output.py b/tests/unit/verticals/secret/output/test_json_output.py index b8982cb084..26a1041f75 100644 --- a/tests/unit/verticals/secret/output/test_json_output.py +++ b/tests/unit/verticals/secret/output/test_json_output.py @@ -415,9 +415,7 @@ def test_json_output_for_patch( assert all( ignore_sha in json_flat_results for result in results.results - for ignore_sha in group_policy_breaks_by_ignore_sha( - result.scan.policy_breaks - ) + for ignore_sha in group_policy_breaks_by_ignore_sha(result.policy_breaks) ) @@ -449,7 +447,7 @@ def test_ignore_known_secrets(verbose, ignore_known_secrets, secrets_types): scan=deepcopy(TWO_POLICY_BREAKS), # 2 policy breaks ) - all_policy_breaks = result.scan.policy_breaks + all_policy_breaks = result.policy_breaks known_policy_breaks = [] new_policy_breaks = all_policy_breaks @@ -542,7 +540,7 @@ def test_with_incident_details( scan=deepcopy(TWO_POLICY_BREAKS), # 2 policy breaks ) - all_policy_breaks = result.scan.policy_breaks + all_policy_breaks = result.policy_breaks known_policy_breaks = [] diff --git a/tests/unit/verticals/secret/output/test_sarif_output.py b/tests/unit/verticals/secret/output/test_sarif_output.py index 58f1318e75..fb7e404e93 100644 --- a/tests/unit/verticals/secret/output/test_sarif_output.py +++ b/tests/unit/verticals/secret/output/test_sarif_output.py @@ -261,9 +261,7 @@ def test_sarif_output_for_nested_scan(init_secrets_engine_version): assert SCHEMA_WITH_INCIDENTS == json_dict # Create a flat list of policy breaks - policy_breaks = sum( - (s.results.results[0].scan.policy_breaks for s in scan.scans), [] - ) + policy_breaks = sum((s.results.results[0].policy_breaks for s in scan.scans), []) # Check each found secret is correctly represented sarif_results = json_dict["runs"][0]["results"] diff --git a/tests/unit/verticals/secret/output/test_text_output.py b/tests/unit/verticals/secret/output/test_text_output.py index 648864fae7..07b322ac58 100644 --- a/tests/unit/verticals/secret/output/test_text_output.py +++ b/tests/unit/verticals/secret/output/test_text_output.py @@ -26,7 +26,6 @@ _SIMPLE_SECRET_MULTILINE_PATCH_SCAN_RESULT, _SIMPLE_SECRET_PATCH, _SIMPLE_SECRET_PATCH_SCAN_RESULT, - TWO_POLICY_BREAKS, ) @@ -143,9 +142,7 @@ def test_leak_message(result_input, snapshot, show_secrets, verbose): # all ignore sha should be in the output assert all( ignore_sha in output - for ignore_sha in group_policy_breaks_by_ignore_sha( - result_input.scan.policy_breaks - ) + for ignore_sha in group_policy_breaks_by_ignore_sha(result_input.policy_breaks) ) @@ -214,150 +211,5 @@ def assert_no_leak_message_is_diplayed( assert "No new secrets have been found" not in output -@pytest.mark.parametrize("verbose", [True, False]) -@pytest.mark.parametrize("ignore_known_secrets", [True, False]) -@pytest.mark.parametrize( - "secrets_types", - ["only_new_secrets", "only_known_secrets", "mixed_secrets", "no_secrets"], -) -def test_ignore_known_secrets(verbose, ignore_known_secrets, secrets_types): - """ - GIVEN policy breaks - WHEN generating text output - THEN if ignore_known_secrets is used, do not show known secret (unless the verbose mode) - """ - secret_config = SecretConfig( - show_secrets=True, ignore_known_secrets=ignore_known_secrets - ) - output_handler = SecretTextOutputHandler( - secret_config=secret_config, verbose=verbose - ) - - result: Result = Result( - StringScannable(content=_ONE_LINE_AND_MULTILINE_PATCH_CONTENT, url="leak.txt"), - scan=deepcopy(TWO_POLICY_BREAKS), # 2 policy breaks - ) - - all_policy_breaks = result.scan.policy_breaks - - known_policy_breaks = [] - new_policy_breaks = all_policy_breaks - - if secrets_types == "no_secrets": - known_policy_breaks = [] - new_policy_breaks = [] - elif secrets_types == "only_known_secrets": - known_policy_breaks = all_policy_breaks - new_policy_breaks = [] - elif secrets_types == "mixed_secrets": - # set only first policy break as known - known_policy_breaks = all_policy_breaks[:1] - new_policy_breaks = all_policy_breaks[1:] - - for index, policy_break in enumerate(known_policy_breaks): - policy_break.known_secret = True - policy_break.incident_url = ( - f"https://dashboard.gitguardian.com/workspace/1/incidents/{index}" - ) - - # call output handler - output = output_handler._process_scan_impl( - SecretScanCollection( - id="outer_scan", - type="outer_scan", - results=Results(results=[], errors=[]), - scans=[ - SecretScanCollection( - id="scan", - type="test", - results=Results( - results=[result] if secrets_types != "no_secrets" else [], - errors=[], - ), - optional_header="> This is an example header", - ) - ], - ) - ) - - output = click.unstyle(output) - - assert_policies_displayed( - output, verbose, ignore_known_secrets, known_policy_breaks + new_policy_breaks - ) - assert_warning_is_displayed( - output, ignore_known_secrets, secrets_types, len(known_policy_breaks) - ) - assert_no_leak_message_is_diplayed(output, ignore_known_secrets, secrets_types) - - -@pytest.mark.parametrize("ignore_known_secrets", [True, False]) -@pytest.mark.parametrize( - "secrets_types", ["only_new_secrets", "only_known_secrets", "mixed_secrets"] -) -def test_ignore_known_secrets_exit_code(ignore_known_secrets, secrets_types): - """ - GIVEN policy breaks - WHEN checking for the exit code - THEN the exit code is 1 when the new secrets are present, and 0 otherwise - """ - secret_config = SecretConfig( - show_secrets=True, ignore_known_secrets=ignore_known_secrets - ) - output_handler = SecretTextOutputHandler(secret_config=secret_config, verbose=False) - - result: Result = Result( - StringScannable( - content=_ONE_LINE_AND_MULTILINE_PATCH_CONTENT, - url="leak.txt", - ), - scan=deepcopy(TWO_POLICY_BREAKS), # 2 policy breaks - ) - - all_policy_breaks = result.scan.policy_breaks - - known_policy_breaks = [] - new_policy_breaks = all_policy_breaks - - if secrets_types == "only_known_secrets": - known_policy_breaks = all_policy_breaks - new_policy_breaks = [] - elif secrets_types == "mixed_secrets": - # set only first policy break as known - known_policy_breaks = all_policy_breaks[:1] - new_policy_breaks = all_policy_breaks[1:] - - for index, policy_break in enumerate(known_policy_breaks): - policy_break.known_secret = True - policy_break.incident_url = ( - f"https://dashboard.gitguardian.com/workspace/1/incidents/{index}" - ) - - # call output handler - exit_code = output_handler._get_exit_code( - SecretScanCollection( - id="outer_scan", - type="outer_scan", - results=Results(results=[], errors=[]), - scans=[ - SecretScanCollection( - id="scan", - type="test", - results=Results(results=[result], errors=[]), - optional_header="> This is an example header", - ) - ], - ) - ) - - expected_exit_code = ( - len(new_policy_breaks) > 0 - if ignore_known_secrets - else len(all_policy_breaks) > 0 - ) - - assert exit_code == expected_exit_code - - def test_format_line_count_break(): assert format_line_count_break(5) == "\x1b[36m\x1b[22m\x1b[22m ...\n\x1b[0m" diff --git a/tests/unit/verticals/secret/test_scan.py b/tests/unit/verticals/secret/test_scan.py deleted file mode 100644 index f2517e1cec..0000000000 --- a/tests/unit/verticals/secret/test_scan.py +++ /dev/null @@ -1,66 +0,0 @@ -import os -import platform -from unittest.mock import ANY, Mock, patch - -from click import Command, Context, Group -from pygitguardian.models import MultiScanResult, ScanResult - -from ggshield import __version__ -from ggshield.core.cache import Cache -from ggshield.core.config.user_config import SecretConfig -from ggshield.core.scan import Commit, ScanContext, ScanMode -from ggshield.utils.os import cd, get_os_info -from ggshield.verticals.secret import SecretScanner -from tests.unit.conftest import UNCHECKED_SECRET_PATCH - - -def test_cd_context_manager(tmpdir): - prev = os.getcwd() - assert prev != tmpdir - with cd(tmpdir): - assert os.getcwd() == tmpdir - assert os.getcwd() == prev - - -@patch("pygitguardian.GGClient.multi_content_scan") -def test_request_headers(scan_mock: Mock, client): - """ - GIVEN a commit to scan - WHEN SecretScanner.scan() is called on it - THEN GGClient.multi_content_scan() is called with the correct values for - `extra_headers` - """ - c = Commit.from_patch(UNCHECKED_SECRET_PATCH) - - scan_result = ScanResult(policy_break_count=0, policy_breaks=[], policies=[]) - multi_scan_result = MultiScanResult([scan_result]) - multi_scan_result.status_code = 200 - scan_mock.return_value = multi_scan_result - - with Context(Command("bar"), info_name="bar") as ctx: - os_name, os_version = get_os_info() - ctx.parent = Context(Group("foo"), info_name="foo") - scanner = SecretScanner( - client=client, - cache=Cache(), - scan_context=ScanContext( - scan_mode=ScanMode.PATH, - command_path=ctx.command_path, - ), - check_api_key=False, - secret_config=SecretConfig(), - ) - scanner.scan(c.get_files(), scanner_ui=Mock()) - scan_mock.assert_called_with( - ANY, - { - "GGShield-Version": __version__, - "GGShield-Command-Path": "foo bar", - "GGShield-Command-Id": ANY, - "GGShield-OS-Name": os_name, - "GGShield-OS-Version": os_version, - "GGShield-Python-Version": platform.python_version(), - "mode": "path", - }, - ignore_known_secrets=True, - ) diff --git a/tests/unit/verticals/secret/test_scan_repo.py b/tests/unit/verticals/secret/test_scan_repo.py index e2dc65201a..9f94be9a11 100644 --- a/tests/unit/verticals/secret/test_scan_repo.py +++ b/tests/unit/verticals/secret/test_scan_repo.py @@ -146,7 +146,7 @@ def test_scan_2_commits_same_content(secret_scanner_mock): assert len(scan_collection.scans) == 2 all_policy_breaks_count = sum( - result.scan.policy_break_count for result in scan_collection.get_all_results() + len(result.policy_breaks) for result in scan_collection.get_all_results() ) assert all_policy_breaks_count == 4 diff --git a/tests/unit/verticals/secret/test_secret_scanner.py b/tests/unit/verticals/secret/test_secret_scanner.py index 69b17057f4..ef1cb978d9 100644 --- a/tests/unit/verticals/secret/test_secret_scanner.py +++ b/tests/unit/verticals/secret/test_secret_scanner.py @@ -1,10 +1,21 @@ +import platform from collections import namedtuple -from unittest.mock import Mock +from unittest.mock import ANY, Mock, patch import click import pytest -from pygitguardian.models import APITokensResponse, Detail +from click import Command, Context, Group +from pygitguardian.models import ( + APITokensResponse, + Detail, + Match, + MultiScanResult, + PolicyBreak, + ScanResult, +) +from ggshield import __version__ +from ggshield.core.cache import Cache from ggshield.core.config.user_config import SecretConfig from ggshield.core.errors import ( ExitCode, @@ -22,6 +33,7 @@ ) from ggshield.core.ui.scanner_ui import ScannerUI from ggshield.utils.git_shell import Filemode +from ggshield.utils.os import get_os_info from ggshield.verticals.secret import SecretScanner from ggshield.verticals.secret.secret_scanner import handle_scan_chunk_error from tests.unit.conftest import ( @@ -101,15 +113,14 @@ def test_scan_patch(client, cache, name: str, input_patch: str, expected: Expect ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) for result in results.results: - if result.scan.policy_breaks: - assert len(result.scan.policy_breaks[0].matches) == expected.matches + if result.policy_breaks: + assert len(result.policy_breaks[0].matches) == expected.matches if expected.first_match: assert ( - result.scan.policy_breaks[0].matches[0].match - == expected.first_match + result.policy_breaks[0].matches[0].match == expected.first_match ) else: - assert result.scan.policy_breaks == [] + assert result.policy_breaks == [] if expected.want: assert result.content == expected.want["content"] @@ -241,10 +252,10 @@ def test_scan_merge_commit(client, cache): secret_config=SecretConfig(), ) results = scanner.scan(commit.get_files(), scanner_ui=Mock()) - scan = results.results[0].scan - assert len(scan.policy_breaks) == 1 + policy_breaks = results.results[0].policy_breaks + assert len(policy_breaks) == 1 - matches = {m.match_type: m.match for m in scan.policy_breaks[0].matches} + matches = {m.match_type: m.match for m in policy_breaks[0].matches} assert matches["username"] == "owly" assert matches["password"] == _SIMPLE_SECRET_TOKEN @@ -280,3 +291,139 @@ def test_with_incident_details_error( secret_config=SecretConfig(with_incident_details=True), ) assert message in str(exc_info.value) + + +@patch("pygitguardian.GGClient.multi_content_scan") +def test_request_headers(scan_mock: Mock, client): + """ + GIVEN a commit to scan + WHEN SecretScanner.scan() is called on it + THEN GGClient.multi_content_scan() is called with the correct values for + `extra_headers` + """ + c = Commit.from_patch(UNCHECKED_SECRET_PATCH) + + scan_result = ScanResult(policy_break_count=0, policy_breaks=[], policies=[]) + multi_scan_result = MultiScanResult([scan_result]) + multi_scan_result.status_code = 200 + scan_mock.return_value = multi_scan_result + + with Context(Command("bar"), info_name="bar") as ctx: + os_name, os_version = get_os_info() + ctx.parent = Context(Group("foo"), info_name="foo") + scanner = SecretScanner( + client=client, + cache=Cache(), + scan_context=ScanContext( + scan_mode=ScanMode.PATH, + command_path=ctx.command_path, + ), + check_api_key=False, + secret_config=SecretConfig(), + ) + scanner.scan(c.get_files(), scanner_ui=Mock()) + scan_mock.assert_called_with( + ANY, + { + "GGShield-Version": __version__, + "GGShield-Command-Path": "foo bar", + "GGShield-Command-Id": ANY, + "GGShield-OS-Name": os_name, + "GGShield-OS-Version": os_version, + "GGShield-Python-Version": platform.python_version(), + "mode": "path", + }, + ignore_known_secrets=True, + ) + + +@pytest.mark.parametrize("ignore_known_secrets", (True, False)) +@patch("pygitguardian.GGClient.multi_content_scan") +def test_scan_ignore_known_secrets(scan_mock: Mock, client, ignore_known_secrets): + """ + GIVEN a call multi_content_scan returning two policy breaks, one known and the other unknown + WHEN - + THEN the known policy break is ignored iff ignore_known_secrets is True + """ + scannable = StringScannable(url="localhost", content="known\nunknown") + known_secret = PolicyBreak( + break_type="a", + policy="Secrets detection", + validity="valid", + known_secret=True, + matches=[ + Match( + match="known", + match_type="apikey", + line_start=0, + line_end=0, + index_start=0, + index_end=1, + ) + ], + ) + unknown_secret = PolicyBreak( + break_type="a", + policy="Secrets detection", + validity="valid", + known_secret=False, + matches=[ + Match( + match="unknown", + match_type="apikey", + line_start=0, + line_end=0, + index_start=0, + index_end=1, + ) + ], + ) + + scan_result = ScanResult( + policy_break_count=1, policy_breaks=[known_secret, unknown_secret], policies=[] + ) + multi_scan_result = MultiScanResult([scan_result]) + multi_scan_result.status_code = 200 + scan_mock.return_value = multi_scan_result + + scanner = SecretScanner( + client=client, + cache=Cache(), + scan_context=ScanContext( + scan_mode=ScanMode.PATH, + command_path="ggshield", + ), + check_api_key=False, + secret_config=SecretConfig(ignore_known_secrets=ignore_known_secrets), + ) + results = scanner.scan([scannable], scanner_ui=Mock()) + + if ignore_known_secrets: + assert results.results[0].policy_breaks == [unknown_secret] + else: + assert results.results[0].policy_breaks == [known_secret, unknown_secret] + + +@patch("pygitguardian.GGClient.multi_content_scan") +def test_scan_unexpected_error(scan_mock: Mock, client): + """ + GIVEN a call multi_content_scan raising an exception + WHEN calling scanner.scan + THEN an UnexpectedError is raised + """ + scannable = StringScannable(url="localhost", content="known\nunknown") + + scan_mock.side_effect = Exception("dummy") + + scanner = SecretScanner( + client=client, + cache=Cache(), + scan_context=ScanContext( + scan_mode=ScanMode.PATH, + command_path="ggshield", + ), + check_api_key=False, + secret_config=SecretConfig(), + ) + with pytest.raises(UnexpectedError, match="Scanning failed.*"): + scanner.scan([scannable], scanner_ui=Mock())