diff --git a/.gitignore b/.gitignore index d3822f0e..1de92d03 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,41 @@ -__pycache__ -/build/ -/*.egg-info/ -/dist/ +.idea +*.iml +.env -# coverage +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ .coverage -htmlcov -coverage.* +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ diff --git a/cycode/cli/auth/auth_command.py b/cycode/cli/auth/auth_command.py index 979fdb19..f7605965 100644 --- a/cycode/cli/auth/auth_command.py +++ b/cycode/cli/auth/auth_command.py @@ -1,8 +1,8 @@ -import json - import click import traceback +from cycode.cli.models import CliResult, CliErrors, CliError +from cycode.cli.printers import ConsolePrinter from cycode.cli.auth.auth_manager import AuthManager from cycode.cli.user_settings.credentials_manager import CredentialsManager from cycode.cli.exceptions.custom_exceptions import AuthProcessError, NetworkError, HttpUnauthorizedError @@ -19,10 +19,13 @@ def authenticate(context: click.Context): return try: - logger.debug("starting authentication process") + logger.debug('Starting authentication process') + auth_manager = AuthManager() auth_manager.authenticate() - click.echo("Successfully logged into cycode") + + result = CliResult(success=True, message='Successfully logged into cycode') + ConsolePrinter(context).print_result(result) except Exception as e: _handle_exception(context, e) @@ -31,49 +34,45 @@ def authenticate(context: click.Context): @click.pass_context def authorization_check(context: click.Context): """ Check your machine associating CLI with your cycode account """ - passed_auth_check_args = {'context': context, 'content': { - 'success': True, - 'message': 'You are authorized' - }, 'color': 'green'} - failed_auth_check_args = {'context': context, 'content': { - 'success': False, - 'message': 'You are not authorized' - }, 'color': 'red'} + printer = ConsolePrinter(context) + + passed_auth_check_res = CliResult(success=True, message='You are authorized') + failed_auth_check_res = CliResult(success=False, message='You are not authorized') client_id, client_secret = CredentialsManager().get_credentials() if not client_id or not client_secret: - return _print_result(**failed_auth_check_args) + return printer.print_result(failed_auth_check_res) try: - # TODO(MarshalX): This property performs HTTP request to refresh the token. This must be the method. if CycodeTokenBasedClient(client_id, client_secret).api_token: - return _print_result(**passed_auth_check_args) + return printer.print_result(passed_auth_check_res) except (NetworkError, HttpUnauthorizedError): if context.obj['verbose']: click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False) - return _print_result(**failed_auth_check_args) + return printer.print_result(failed_auth_check_res) -def _print_result(context: click.Context, content: dict, color: str) -> None: - # the current impl of printers supports only results of scans - if context.obj['output'] == 'text': - return click.secho(content['message'], fg=color) +def _handle_exception(context: click.Context, e: Exception): + if context.obj['verbose']: + click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False) - return click.echo(json.dumps({'result': content['success'], 'message': content['message']})) + errors: CliErrors = { + AuthProcessError: CliError( + code='auth_error', + message='Authentication failed. Please try again later using the command `cycode auth`' + ), + NetworkError: CliError( + code='cycode_error', + message='Authentication failed. Please try again later using the command `cycode auth`' + ), + } + error = errors.get(type(e)) + if error: + return ConsolePrinter(context).print_error(error) -def _handle_exception(context: click.Context, e: Exception): - verbose = context.obj["verbose"] - if verbose: - click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False) - if isinstance(e, AuthProcessError): - click.secho('Authentication failed. Please try again later using the command `cycode auth`', - fg='red', nl=False) - elif isinstance(e, NetworkError): - click.secho('Authentication failed. Please try again later using the command `cycode auth`', - fg='red', nl=False) - elif isinstance(e, click.ClickException): + if isinstance(e, click.ClickException): raise e - else: - raise click.ClickException(str(e)) + + raise click.ClickException(str(e)) diff --git a/cycode/cli/code_scanner.py b/cycode/cli/code_scanner.py index 9badd037..7974129b 100644 --- a/cycode/cli/code_scanner.py +++ b/cycode/cli/code_scanner.py @@ -1,5 +1,3 @@ -from typing import Type, NamedTuple - import click import json import logging @@ -14,8 +12,8 @@ from halo import Halo -from cycode.cli.printers import ResultsPrinter -from cycode.cli.models import Document, DocumentDetections, Severity +from cycode.cli.printers import ConsolePrinter +from cycode.cli.models import Document, DocumentDetections, Severity, CliError, CliErrors from cycode.cli.ci_integrations import get_commit_range from cycode.cli.consts import * from cycode.cli.config import configuration_manager @@ -446,14 +444,15 @@ def print_scan_details(scan_details_response: ScanDetailsResponse): if scan_details_response.message is not None: logger.info(f"Scan message: {scan_details_response.message}") -def print_results(context: click.Context, document_detections_list: List[DocumentDetections]): - output_type = context.obj['output'] - printer = ResultsPrinter() - printer.print_results(context, document_detections_list, output_type) + +def print_results(context: click.Context, document_detections_list: List[DocumentDetections]) -> None: + printer = ConsolePrinter(context) + printer.print_scan_results(document_detections_list) -def enrich_scan_result(scan_result: ZippedFileScanResult, documents_to_scan: List[Document]) -> \ - List[DocumentDetections]: +def enrich_scan_result( + scan_result: ZippedFileScanResult, documents_to_scan: List[Document] +) -> List[DocumentDetections]: logger.debug('enriching scan result') document_detections_list = [] for detections_per_file in scan_result.detections_per_file: @@ -819,49 +818,39 @@ def _is_subpath_of_cycode_configuration_folder(filename: str) -> bool: or filename.endswith(ConfigFileManager.get_config_file_route()) -class CliScanError(NamedTuple): - soft_fail: bool - code: str - message: str - - -CliScanErrors = Dict[Type[Exception], CliScanError] - - def _handle_exception(context: click.Context, e: Exception): context.obj['did_fail'] = True if context.obj['verbose']: click.secho(f'Error: {traceback.format_exc()}', fg='red', nl=False) - # TODO(MarshalX): Create global CLI errors database and move this - errors: CliScanErrors = { - NetworkError: CliScanError( + errors: CliErrors = { + NetworkError: CliError( soft_fail=True, code='cycode_error', message='Cycode was unable to complete this scan. ' 'Please try again by executing the `cycode scan` command' ), - ScanAsyncError: CliScanError( + ScanAsyncError: CliError( soft_fail=True, code='scan_error', message='Cycode was unable to complete this scan. ' 'Please try again by executing the `cycode scan` command' ), - HttpUnauthorizedError: CliScanError( + HttpUnauthorizedError: CliError( soft_fail=True, code='auth_error', message='Unable to authenticate to Cycode, your token is either invalid or has expired. ' 'Please re-generate your token and reconfigure it by running the `cycode configure` command' ), - ZipTooLargeError: CliScanError( + ZipTooLargeError: CliError( soft_fail=True, code='zip_too_large_error', message='The path you attempted to scan exceeds the current maximum scanning size cap (10MB). ' 'Please try ignoring irrelevant paths using the ‘cycode ignore --by-path’ command ' 'and execute the scan again' ), - InvalidGitRepositoryError: CliScanError( + InvalidGitRepositoryError: CliError( soft_fail=False, code='invalid_git_error', message='The path you supplied does not correlate to a git repository. ' @@ -875,7 +864,7 @@ def _handle_exception(context: click.Context, e: Exception): if error.soft_fail is True: context.obj['soft_fail'] = True - return _print_error(context, error) + return ConsolePrinter(context).print_error(error) if isinstance(e, click.ClickException): raise e @@ -883,14 +872,6 @@ def _handle_exception(context: click.Context, e: Exception): raise click.ClickException(str(e)) -def _print_error(context: click.Context, error: CliScanError) -> None: - # TODO(MarshalX): Extend functionality of CLI printers and move this - if context.obj['output'] == 'text': - click.secho(error.message, fg='red', nl=False) - elif context.obj['output'] == 'json': - click.echo(json.dumps({'error': error.code, 'message': error.message}, ensure_ascii=False)) - - def _report_scan_status(context: click.Context, scan_type: str, scan_id: str, scan_completed: bool, output_detections_count: int, all_detections_count: int, files_to_scan_count: int, zip_size: int, command_scan_type: str, error_message: Optional[str]): diff --git a/cycode/cli/models.py b/cycode/cli/models.py index 3cc2899d..011ffe4e 100644 --- a/cycode/cli/models.py +++ b/cycode/cli/models.py @@ -1,5 +1,6 @@ from enum import Enum -from typing import List +from typing import List, NamedTuple, Dict, Type + from cycode.cyclient.models import Detection @@ -43,3 +44,17 @@ def try_get_value(name: str) -> any: return None return Severity[name].value + + +class CliError(NamedTuple): + code: str + message: str + soft_fail: bool = False + + +CliErrors = Dict[Type[Exception], CliError] + + +class CliResult(NamedTuple): + success: bool + message: str diff --git a/cycode/cli/printers/__init__.py b/cycode/cli/printers/__init__.py index 226d4fd7..da9fc379 100644 --- a/cycode/cli/printers/__init__.py +++ b/cycode/cli/printers/__init__.py @@ -1,10 +1,3 @@ -from .json_printer import JsonPrinter -from .text_printer import TextPrinter -from .results_printer import ResultsPrinter +from cycode.cli.printers.console_printer import ConsolePrinter - -__all__ = [ - 'JsonPrinter', - 'TextPrinter', - 'ResultsPrinter' -] +__all__ = ['ConsolePrinter'] diff --git a/cycode/cli/printers/base_printer.py b/cycode/cli/printers/base_printer.py index 28ce1fff..c1a87873 100644 --- a/cycode/cli/printers/base_printer.py +++ b/cycode/cli/printers/base_printer.py @@ -3,16 +3,23 @@ import click -from cycode.cli.models import DocumentDetections +from cycode.cli.models import DocumentDetections, CliResult, CliError class BasePrinter(ABC): - context: click.Context def __init__(self, context: click.Context): self.context = context @abstractmethod - def print_results(self, results: List[DocumentDetections]): + def print_scan_results(self, results: List[DocumentDetections]) -> None: + pass + + @abstractmethod + def print_result(self, result: CliResult) -> None: + pass + + @abstractmethod + def print_error(self, error: CliError) -> None: pass diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py new file mode 100644 index 00000000..172902df --- /dev/null +++ b/cycode/cli/printers/console_printer.py @@ -0,0 +1,47 @@ +import click +from typing import List, TYPE_CHECKING + +from cycode.cli.consts import SCA_SCAN_TYPE +from cycode.cli.exceptions.custom_exceptions import CycodeError +from cycode.cli.models import DocumentDetections, CliResult, CliError +from cycode.cli.printers.table_printer import TablePrinter +from cycode.cli.printers.json_printer import JsonPrinter +from cycode.cli.printers.text_printer import TextPrinter + +if TYPE_CHECKING: + from cycode.cli.printers.base_printer import BasePrinter + + +class ConsolePrinter: + _AVAILABLE_PRINTERS = { + 'text': TextPrinter, + 'json': JsonPrinter, + 'text_sca': TablePrinter + } + + def __init__(self, context: click.Context): + self.context = context + self.output_type = self.context.obj.get('output') + + self._printer_class = self._AVAILABLE_PRINTERS.get(self.output_type) + if self._printer_class is None: + raise CycodeError(f'"{self.output_type}" output type is not supported.') + + def print_scan_results(self, detections_results_list: List[DocumentDetections]) -> None: + printer = self._get_scan_printer() + printer.print_scan_results(detections_results_list) + + def _get_scan_printer(self) -> 'BasePrinter': + scan_type = self.context.obj.get('scan_type') + + printer_class = self._printer_class + if scan_type == SCA_SCAN_TYPE and self.output_type == 'text': + printer_class = TablePrinter + + return printer_class(self.context) + + def print_result(self, result: CliResult) -> None: + self._printer_class(self.context).print_result(result) + + def print_error(self, error: CliError) -> None: + self._printer_class(self.context).print_error(error) diff --git a/cycode/cli/printers/json_printer.py b/cycode/cli/printers/json_printer.py index 07d32305..2e77aba7 100644 --- a/cycode/cli/printers/json_printer.py +++ b/cycode/cli/printers/json_printer.py @@ -3,30 +3,50 @@ import click -from cycode.cli.models import DocumentDetections +from cycode.cli.models import DocumentDetections, CliResult, CliError from cycode.cli.printers.base_printer import BasePrinter from cycode.cyclient.models import DetectionSchema class JsonPrinter(BasePrinter): - - scan_id: str - def __init__(self, context: click.Context): super().__init__(context) self.scan_id = context.obj.get('scan_id') - def print_results(self, results: List[DocumentDetections]): - detections = [detection for document_detections in results for detection in document_detections.detections] - detections_schema = DetectionSchema(many=True) - detections_dict = detections_schema.dump(detections) - json_result = self._get_json_result(detections_dict) - click.secho(json_result) + def print_result(self, result: CliResult) -> None: + result = { + 'result': result.success, + 'message': result.message + } - def _get_json_result(self, detections): + click.secho(self.get_data_json(result)) + + def print_error(self, error: CliError) -> None: + result = { + 'error': error.code, + 'message': error.message + } + + click.secho(self.get_data_json(result)) + + def print_scan_results(self, results: List[DocumentDetections]) -> None: + detections = [] + for result in results: + detections.extend(result.detections) + + detections_dict = DetectionSchema(many=True).dump(detections) + + click.secho(self._get_json_scan_result(detections_dict)) + + def _get_json_scan_result(self, detections: dict) -> str: result = { 'scan_id': str(self.scan_id), 'detections': detections } - return json.dumps(result, indent=4) + return self.get_data_json(result) + + @staticmethod + def get_data_json(data: dict) -> str: + # ensure_ascii is disabled for symbols like "`". Eg: `cycode scan` + return json.dumps(data, indent=4, ensure_ascii=False) diff --git a/cycode/cli/printers/results_printer.py b/cycode/cli/printers/results_printer.py deleted file mode 100644 index 84e0e575..00000000 --- a/cycode/cli/printers/results_printer.py +++ /dev/null @@ -1,30 +0,0 @@ -import click -from typing import List - -from cycode.cli.consts import SCA_SCAN_TYPE -from cycode.cli.printers import JsonPrinter, TextPrinter -from cycode.cli.models import DocumentDetections -from cycode.cli.printers.table_printer import TablePrinter - - -class ResultsPrinter: - printers = { - 'text': TextPrinter, - 'json': JsonPrinter, - 'text_sca': TablePrinter - } - - def print_results(self, context: click.Context, detections_results_list: List[DocumentDetections], - output_type: str): - printer = self.get_printer(output_type, context) - printer.print_results(detections_results_list) - - def get_printer(self, output_type: str, context: click.Context): - scan_type = context.obj.get('scan_type') - printer = TablePrinter if scan_type is not None and scan_type == SCA_SCAN_TYPE and output_type == 'text' \ - else self.printers.get(output_type) - - if not printer: - raise ValueError(f'the provided output is not supported - {output_type}') - - return printer(context) diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index 1a999eb9..fdd699f4 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -1,11 +1,11 @@ +from collections import defaultdict from typing import List, Dict import click from texttable import Texttable -from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, \ - PACKAGE_VULNERABILITY_POLICY_ID -from cycode.cli.models import DocumentDetections, Detection +from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, PACKAGE_VULNERABILITY_POLICY_ID +from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult from cycode.cli.printers.base_printer import BasePrinter SEVERITY_COLUMN = 'Severity' @@ -13,9 +13,14 @@ UPGRADE_COLUMN = 'Upgrade' REPOSITORY_COLUMN = 'Repository' CVE_COLUMN = 'CVE' -PREVIEW_DETECTIONS_COMMON_HEADERS = ['File Path', 'Ecosystem', 'Dependency Name', - 'Direct Dependency', - 'Development Dependency'] + +PREVIEW_DETECTIONS_COMMON_HEADERS = [ + 'File Path', + 'Ecosystem', + 'Dependency Name', + 'Direct Dependency', + 'Development Dependency' +] class TablePrinter(BasePrinter): @@ -23,13 +28,17 @@ class TablePrinter(BasePrinter): WHITE_COLOR_NAME = 'white' GREEN_COLOR_NAME = 'green' - scan_id: str - def __init__(self, context: click.Context): super().__init__(context) self.scan_id = context.obj.get('scan_id') - def print_results(self, results: List[DocumentDetections]): + def print_result(self, result: CliResult) -> None: + raise NotImplemented + + def print_error(self, error: CliError) -> None: + raise NotImplemented + + def print_scan_results(self, results: List[DocumentDetections]): click.secho(f"Scan Results: (scan_id: {self.scan_id})") if not results: @@ -40,78 +49,85 @@ def print_results(self, results: List[DocumentDetections]): self._print_detection_per_detection_type_id(detections_per_detection_type_id) - if self.context.obj.get('report_url'): - click.secho(f"Report URL: {self.context.obj.get('report_url')}") + report_url = self.context.obj.get('report_url') + if report_url: + click.secho(f'Report URL: {report_url}') @staticmethod - def _extract_detections_per_detection_type_id(results: List[DocumentDetections]): - detections_per_detection_type_id = {} + def _extract_detections_per_detection_type_id(results: List[DocumentDetections]) -> Dict[str, List[Detection]]: + detections_per_detection_type_id = defaultdict(list) for document_detection in results: for detection in document_detection.detections: - if detection.detection_type_id not in detections_per_detection_type_id: - detections_per_detection_type_id[detection.detection_type_id] = [] detections_per_detection_type_id[detection.detection_type_id].append(detection) return detections_per_detection_type_id - def _print_detection_per_detection_type_id(self, detections_per_detection_type_id: Dict[str, Detection]): + def _print_detection_per_detection_type_id( + self, detections_per_detection_type_id: Dict[str, List[Detection]] + ) -> None: for detection_type_id in detections_per_detection_type_id: detections = detections_per_detection_type_id[detection_type_id] headers = self._get_table_headers() + + title = None rows = [] - title = "" if detection_type_id == PACKAGE_VULNERABILITY_POLICY_ID: title = "Dependencies Vulnerabilities" + headers = [SEVERITY_COLUMN] + headers headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) headers.append(CVE_COLUMN) headers.append(UPGRADE_COLUMN) + for detection in detections: rows.append(self._get_upgrade_package_vulnerability(detection)) - - if detection_type_id == LICENSE_COMPLIANCE_POLICY_ID: + elif detection_type_id == LICENSE_COMPLIANCE_POLICY_ID: title = "License Compliance" + headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) headers.append(LICENSE_COLUMN) + for detection in detections: rows.append(self._get_license(detection)) - if len(rows) > 0: - self._print_table_detections(detections, - headers, - rows, - title) + if rows: + self._print_table_detections(detections, headers, rows, title) + + def _get_table_headers(self) -> list: + if self._is_git_repository(): + return [REPOSITORY_COLUMN] - def _get_table_headers(self): - headers = [REPOSITORY_COLUMN] if self._is_git_repository() else [] - return headers + return [] - def _print_table_detections(self, detections: List[Detection], headers: List[str], - rows, title: str): + def _print_table_detections( + self, detections: List[Detection], headers: List[str], rows, title: str + ) -> None: self._print_summary_issues(detections, title) text_table = Texttable() text_table.header(headers) self.set_table_width(headers, text_table) + for row in rows: text_table.add_row(row) + click.echo(text_table.draw()) @staticmethod - def set_table_width(headers, text_table): + def set_table_width(headers: List[str], text_table: Texttable) -> None: header_width_size_cols = [] for header in headers: header_width_size_cols.append(len(header)) + text_table.set_cols_width(header_width_size_cols) @staticmethod - def _print_summary_issues(detections: List, title: str): - click.echo( - f'⛔ Found {len(detections)} issues of type: {click.style(title, bold=True)}') + def _print_summary_issues(detections: List, title: str) -> None: + click.echo(f'⛔ Found {len(detections)} issues of type: {click.style(title, bold=True)}') - def _get_common_detection_fields(self, detection: Detection): + def _get_common_detection_fields(self, detection: Detection) -> List[str]: row = [ detection.detection_details.get('file_name'), detection.detection_details.get('ecosystem'), @@ -126,20 +142,21 @@ def _get_common_detection_fields(self, detection: Detection): return row - def _is_git_repository(self): + def _is_git_repository(self) -> bool: return self.context.obj.get("remote_url") is not None - def _get_upgrade_package_vulnerability(self, detection: Detection): + def _get_upgrade_package_vulnerability(self, detection: Detection) -> List[str]: alert = detection.detection_details.get('alert') - row = [detection.detection_details.get('advisory_severity')] - row.extend(self._get_common_detection_fields(detection)) - upgrade = f'{alert.get("vulnerable_requirements")} -> {alert.get("first_patched_version")}' if alert.get( - "first_patched_version") is not None else '' + row = [detection.detection_details.get('advisory_severity')] + self._get_common_detection_fields(detection) + + upgrade = '' + if alert.get("first_patched_version"): + upgrade = f'{alert.get("vulnerable_requirements")} -> {alert.get("first_patched_version")}' row.append(upgrade) return row - def _get_license(self, detection: Detection): + def _get_license(self, detection: Detection) -> List[str]: row = self._get_common_detection_fields(detection) row.append(f'{detection.detection_details.get("license")}') return row diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index 2f840cf3..bb7091c7 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -4,7 +4,7 @@ import click from cycode.cli.printers.base_printer import BasePrinter -from cycode.cli.models import DocumentDetections, Detection, Document +from cycode.cli.models import DocumentDetections, Detection, Document, CliResult, CliError from cycode.cli.config import config from cycode.cli.consts import SECRET_SCAN_TYPE, COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES from cycode.cli.utils.string_utils import obfuscate_text @@ -15,19 +15,24 @@ class TextPrinter(BasePrinter): WHITE_COLOR_NAME = 'white' GREEN_COLOR_NAME = 'green' - scan_id: str - scan_type: str - command_scan_type: str - show_secret: bool = False - def __init__(self, context: click.Context): super().__init__(context) - self.scan_id = context.obj.get('scan_id') - self.scan_type = context.obj.get('scan_type') - self.command_scan_type = context.info_name - self.show_secret = context.obj.get('show_secret', False) + self.scan_id: str = context.obj.get('scan_id') + self.scan_type: str = context.obj.get('scan_type') + self.command_scan_type: str = context.info_name + self.show_secret: bool = context.obj.get('show_secret', False) + + def print_result(self, result: CliResult) -> None: + color = None + if not result.success: + color = self.RED_COLOR_NAME + + click.secho(result.message, fg=color) - def print_results(self, results: List[DocumentDetections]): + def print_error(self, error: CliError) -> None: + click.secho(error.message, fg=self.RED_COLOR_NAME, nl=False) + + def print_scan_results(self, results: List[DocumentDetections]): click.secho(f"Scan Results: (scan_id: {self.scan_id})") if not results: @@ -37,8 +42,9 @@ def print_results(self, results: List[DocumentDetections]): for document_detections in results: self._print_document_detections(document_detections) - if self.context.obj.get('report_url'): - click.secho(f"Report URL: {self.context.obj.get('report_url')}") + report_url = self.context.obj.get('report_url') + if report_url: + click.secho(f'Report URL: {report_url}') def _print_document_detections(self, document_detections: DocumentDetections): document = document_detections.document @@ -54,9 +60,10 @@ def _print_detection_summary(self, detection: Detection, document_path: str): detection_commit_id = detection.detection_details.get('commit_id') detection_commit_id_message = f'\nCommit SHA: {detection_commit_id}' if detection_commit_id else '' click.echo( - f'⛔ Found issue of type: {click.style(detection_name, fg="bright_red", bold=True)} ' + - f'(rule ID: {detection.detection_rule_id}) in file: {click.format_filename(document_path)} ' + - f'{detection_sha_message}{detection_commit_id_message} ⛔ ') + f'⛔ Found issue of type: {click.style(detection_name, fg="bright_red", bold=True)} ' + f'(rule ID: {detection.detection_rule_id}) in file: {click.format_filename(document_path)} ' + f'{detection_sha_message}{detection_commit_id_message} ⛔' + ) def _print_detection_code_segment(self, detection: Detection, document: Document, code_segment_size: int): if self._is_git_diff_based_scan(): @@ -77,16 +84,19 @@ def _print_line_of_code_segment(self, document: Document, line: str, line_number self._print_line(document, line, line_number) def _print_detection_line(self, document: Document, line: str, line_number: int, detection_position_in_line: int, - violation_length: int): + violation_length: int) -> None: click.echo( f'{self._get_line_number_style(line_number)} ' - f'{self._get_detection_line_style(line, document.is_git_diff_format, detection_position_in_line, violation_length)}') + f'{self._get_detection_line_style(line, document.is_git_diff_format, detection_position_in_line, violation_length)}' + ) def _print_line(self, document: Document, line: str, line_number: int): - click.echo( - f'{self._get_line_number_style(line_number)} {self._get_line_style(line, document.is_git_diff_format)}') + line_no = self._get_line_number_style(line_number) + line = self._get_line_style(line, document.is_git_diff_format) - def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position: int, length: int): + click.echo(f'{line_no} {line}') + + def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position: int, length: int) -> str: line_color = self._get_line_color(line, is_git_diff) if self.scan_type != SECRET_SCAN_TYPE or start_position < 0 or length < 0: return self._get_line_style(line, is_git_diff, line_color) @@ -94,17 +104,23 @@ def _get_detection_line_style(self, line: str, is_git_diff: bool, start_position violation = line[start_position: start_position + length] if not self.show_secret: violation = obfuscate_text(violation) + line_to_violation = line[0: start_position] line_from_violation = line[start_position + length:] + return f'{self._get_line_style(line_to_violation, is_git_diff, line_color)}' \ f'{self._get_line_style(violation, is_git_diff, line_color, underline=True)}' \ f'{self._get_line_style(line_from_violation, is_git_diff, line_color)}' - def _get_line_style(self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False): - color = color or self._get_line_color(line, is_git_diff) + def _get_line_style( + self, line: str, is_git_diff: bool, color: Optional[str] = None, underline: bool = False + ) -> str: + if color is None: + color = self._get_line_color(line, is_git_diff) + return click.style(line, fg=color, bold=False, underline=underline) - def _get_line_color(self, line: str, is_git_diff: bool): + def _get_line_color(self, line: str, is_git_diff: bool) -> str: if not is_git_diff: return self.WHITE_COLOR_NAME @@ -120,7 +136,8 @@ def _get_position_in_line(self, text: str, position: int) -> int: return position - text.rfind('\n', 0, position) - 1 def _get_line_number_style(self, line_number: int): - return f'{click.style(str(line_number), fg=self.WHITE_COLOR_NAME, bold=False)} {click.style("|", fg=self.RED_COLOR_NAME, bold=False)}' + return f'{click.style(str(line_number), fg=self.WHITE_COLOR_NAME, bold=False)} ' \ + f'{click.style("|", fg=self.RED_COLOR_NAME, bold=False)}' def _get_lines_to_display_count(self) -> int: result_printer_configuration = config.get('result_printer') diff --git a/cycode/cli/utils/string_utils.py b/cycode/cli/utils/string_utils.py index 9d486df0..6d29e3ef 100644 --- a/cycode/cli/utils/string_utils.py +++ b/cycode/cli/utils/string_utils.py @@ -20,10 +20,8 @@ def obfuscate_text(text: str) -> str: obfuscate_regex = re.compile(r"[^+\-\s]") -""" -get the first 1024 chars and check if its binary or not -""" -def is_binary_content(content: str): +def is_binary_content(content: str) -> bool: + """Get the first 1024 chars and check if it's binary or not.""" chunk = content[:1024] chunk_bytes = convert_string_to_bytes(chunk) return is_binary_string(chunk_bytes) diff --git a/cycode/cyclient/cycode_token_based_client.py b/cycode/cyclient/cycode_token_based_client.py index 53200517..e3da7cc2 100644 --- a/cycode/cyclient/cycode_token_based_client.py +++ b/cycode/cyclient/cycode_token_based_client.py @@ -20,6 +20,7 @@ def __init__(self, client_id: str, client_secret: str): @property def api_token(self) -> str: + # TODO(MarshalX): This property performs HTTP request to refresh the token. This must be the method. with self.lock: self.refresh_api_token_if_needed() return self._api_token diff --git a/tests/cyclient/test_auth_client.py b/tests/cyclient/test_auth_client.py index f5952b37..a74b2ec7 100644 --- a/tests/cyclient/test_auth_client.py +++ b/tests/cyclient/test_auth_client.py @@ -143,7 +143,6 @@ def test_get_api_token_success_completed(client: AuthClient, token_url: str, cod @responses.activate def test_get_api_token_http_error_valid_response(client: AuthClient, token_url: str, code_verifier: str): - # TODO(MarshalX): ask Michal about such cases or dive into code of platform expected_status = 'Pending' expected_api_token = None