From cfbf1edfa0f1c26ddb5cc53425d4327e7fa44ba6 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Fri, 9 Jun 2023 14:51:08 +0200 Subject: [PATCH 1/6] Add table output for all scan types --- README.md | 12 +- cycode/cli/main.py | 8 +- cycode/cli/printers/base_printer.py | 4 +- cycode/cli/printers/console_printer.py | 16 +- cycode/cli/printers/sca_table_printer.py | 166 ++++++++++++++++ cycode/cli/printers/table_printer.py | 236 +++++++++++------------ cycode/cli/printers/text_printer.py | 13 +- cycode/cli/utils/string_utils.py | 4 + 8 files changed, 313 insertions(+), 146 deletions(-) create mode 100644 cycode/cli/printers/sca_table_printer.py diff --git a/README.md b/README.md index 8b79a7a3..370001e7 100644 --- a/README.md +++ b/README.md @@ -211,12 +211,12 @@ repos: The following are the options and commands available with the Cycode CLI application: -| Option | Description | -|-------------------------|-----------------------------------------------------------| -| `--output [text\|json]` | Specify the output (`text`/`json`). The default is `text` | -| `-v`, `--verbose` | Show detailed logs | -| `--version` | Show the version and exit. | -| `--help` | Show options for given command. | +| Option | Description | +|--------------------------------|-------------------------------------------------------------------| +| `--output [text\|json\|table]` | Specify the output (`text`/`json`/`table`). The default is `text` | +| `-v`, `--verbose` | Show detailed logs | +| `--version` | Show the version and exit. | +| `--help` | Show options for given command. | | Command | Description | |-------------------------------------|-------------| diff --git a/cycode/cli/main.py b/cycode/cli/main.py index 9c94bfa0..9eee8343 100644 --- a/cycode/cli/main.py +++ b/cycode/cli/main.py @@ -65,10 +65,10 @@ @click.option('--output', default=None, help=""" \b - Specify the results output (text/json), + Specify the results output (text/json/table), the default is text """, - type=click.Choice(['text', 'json'])) + type=click.Choice(['text', 'json', 'table'])) @click.option('--severity-threshold', default=None, help='Show only violations at the specified level or higher (supported for SCA scan type only).', @@ -142,8 +142,8 @@ def finalize(context: click.Context, *args, **kwargs): @click.option( '--output', default='text', - help='Specify the output (text/json), the default is text', - type=click.Choice(['text', 'json']) + help='Specify the output (text/json/table), the default is text', + type=click.Choice(['text', 'json', 'table']) ) @click.option( '--user-agent', diff --git a/cycode/cli/printers/base_printer.py b/cycode/cli/printers/base_printer.py index c1a87873..e5450e8f 100644 --- a/cycode/cli/printers/base_printer.py +++ b/cycode/cli/printers/base_printer.py @@ -7,7 +7,9 @@ class BasePrinter(ABC): - context: click.Context + RED_COLOR_NAME = 'red' + WHITE_COLOR_NAME = 'white' + GREEN_COLOR_NAME = 'green' def __init__(self, context: click.Context): self.context = context diff --git a/cycode/cli/printers/console_printer.py b/cycode/cli/printers/console_printer.py index 172902df..9932e140 100644 --- a/cycode/cli/printers/console_printer.py +++ b/cycode/cli/printers/console_printer.py @@ -1,10 +1,10 @@ import click from typing import List, TYPE_CHECKING -from cycode.cli.consts import SCA_SCAN_TYPE from cycode.cli.exceptions.custom_exceptions import CycodeError from cycode.cli.models import DocumentDetections, CliResult, CliError from cycode.cli.printers.table_printer import TablePrinter +from cycode.cli.printers.sca_table_printer import SCATablePrinter from cycode.cli.printers.json_printer import JsonPrinter from cycode.cli.printers.text_printer import TextPrinter @@ -16,11 +16,15 @@ class ConsolePrinter: _AVAILABLE_PRINTERS = { 'text': TextPrinter, 'json': JsonPrinter, - 'text_sca': TablePrinter + 'table': TablePrinter, + # overrides + 'table_sca': SCATablePrinter, + 'text_sca': SCATablePrinter, } def __init__(self, context: click.Context): self.context = context + self.scan_type = self.context.obj.get('scan_type') self.output_type = self.context.obj.get('output') self._printer_class = self._AVAILABLE_PRINTERS.get(self.output_type) @@ -32,11 +36,11 @@ def print_scan_results(self, detections_results_list: List[DocumentDetections]) printer.print_scan_results(detections_results_list) def _get_scan_printer(self) -> 'BasePrinter': - scan_type = self.context.obj.get('scan_type') - printer_class = self._printer_class - if scan_type == SCA_SCAN_TYPE and self.output_type == 'text': - printer_class = TablePrinter + + composite_printer = self._AVAILABLE_PRINTERS.get(f'{self.output_type}_{self.scan_type}') + if composite_printer: + printer_class = composite_printer return printer_class(self.context) diff --git a/cycode/cli/printers/sca_table_printer.py b/cycode/cli/printers/sca_table_printer.py new file mode 100644 index 00000000..dcd0824d --- /dev/null +++ b/cycode/cli/printers/sca_table_printer.py @@ -0,0 +1,166 @@ +from collections import defaultdict +from typing import List, Dict + +import click +from texttable import Texttable + +from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, PACKAGE_VULNERABILITY_POLICY_ID +from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult +from cycode.cli.printers.base_printer import BasePrinter + +SEVERITY_COLUMN = 'Severity' +LICENSE_COLUMN = 'License' +UPGRADE_COLUMN = 'Upgrade' +REPOSITORY_COLUMN = 'Repository' +CVE_COLUMN = 'CVE' + +PREVIEW_DETECTIONS_COMMON_HEADERS = [ + 'File Path', + 'Ecosystem', + 'Dependency Name', + 'Direct Dependency', + 'Development Dependency' +] + + +class SCATablePrinter(BasePrinter): + def __init__(self, context: click.Context): + super().__init__(context) + self.scan_id = context.obj.get('scan_id') + + def print_result(self, result: CliResult) -> None: + raise NotImplemented + + def print_error(self, error: CliError) -> None: + raise NotImplemented + + def print_scan_results(self, results: List[DocumentDetections]): + click.secho(f"Scan Results: (scan_id: {self.scan_id})") + + if not results: + click.secho("Good job! No issues were found!!! 👏👏👏", fg=self.GREEN_COLOR_NAME) + return + + detections_per_detection_type_id = self._extract_detections_per_detection_type_id(results) + + self._print_detection_per_detection_type_id(detections_per_detection_type_id) + + report_url = self.context.obj.get('report_url') + if report_url: + click.secho(f'Report URL: {report_url}') + + @staticmethod + def _extract_detections_per_detection_type_id(results: List[DocumentDetections]) -> Dict[str, List[Detection]]: + detections_per_detection_type_id = defaultdict(list) + + for document_detection in results: + for detection in document_detection.detections: + detections_per_detection_type_id[detection.detection_type_id].append(detection) + + return detections_per_detection_type_id + + def _print_detection_per_detection_type_id( + self, detections_per_detection_type_id: Dict[str, List[Detection]] + ) -> None: + for detection_type_id in detections_per_detection_type_id: + detections = detections_per_detection_type_id[detection_type_id] + headers = self._get_table_headers() + + title = None + rows = [] + + if detection_type_id == PACKAGE_VULNERABILITY_POLICY_ID: + title = "Dependencies Vulnerabilities" + + headers = [SEVERITY_COLUMN] + headers + headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) + headers.append(CVE_COLUMN) + headers.append(UPGRADE_COLUMN) + + for detection in detections: + rows.append(self._get_upgrade_package_vulnerability(detection)) + elif detection_type_id == LICENSE_COMPLIANCE_POLICY_ID: + title = "License Compliance" + + headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) + headers.append(LICENSE_COLUMN) + + for detection in detections: + rows.append(self._get_license(detection)) + + if rows: + self._print_table_detections(detections, headers, rows, title) + + def _get_table_headers(self) -> list: + if self._is_git_repository(): + return [REPOSITORY_COLUMN] + + return [] + + def _print_table_detections( + self, detections: List[Detection], headers: List[str], rows, title: str + ) -> None: + self._print_summary_issues(detections, title) + text_table = Texttable() + text_table.header(headers) + + self.set_table_width(headers, text_table) + + for row in rows: + text_table.add_row(row) + + click.echo(text_table.draw()) + + @staticmethod + def set_table_width(headers: List[str], text_table: Texttable) -> None: + header_width_size_cols = [] + for header in headers: + header_len = len(header) + if header == CVE_COLUMN: + header_width_size_cols.append(header_len * 5) + elif header == UPGRADE_COLUMN: + header_width_size_cols.append(header_len * 2) + else: + header_width_size_cols.append(header_len) + text_table.set_cols_width(header_width_size_cols) + + @staticmethod + def _print_summary_issues(detections: List, title: str) -> None: + click.echo(f'⛔ Found {len(detections)} issues of type: {click.style(title, bold=True)}') + + def _get_common_detection_fields(self, detection: Detection) -> List[str]: + row = [ + detection.detection_details.get('file_name'), + detection.detection_details.get('ecosystem'), + detection.detection_details.get('package_name'), + detection.detection_details.get('is_direct_dependency_str'), + detection.detection_details.get('is_dev_dependency_str') + ] + + if self._is_git_repository(): + row = [detection.detection_details.get('repository_name')] + row + + return row + + def _is_git_repository(self) -> bool: + return self.context.obj.get("remote_url") is not None + + def _get_upgrade_package_vulnerability(self, detection: Detection) -> List[str]: + alert = detection.detection_details.get('alert') + row = [ + detection.detection_details.get('advisory_severity'), + *self._get_common_detection_fields(detection), + detection.detection_details.get('vulnerability_id') + ] + + upgrade = '' + if alert.get("first_patched_version"): + upgrade = f'{alert.get("vulnerable_requirements")} -> {alert.get("first_patched_version")}' + row.append(upgrade) + + return row + + def _get_license(self, detection: Detection) -> List[str]: + row = self._get_common_detection_fields(detection) + row.append(f'{detection.detection_details.get("license")}') + return row diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index e620cdb2..53a611e7 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -1,170 +1,168 @@ -from collections import defaultdict -from typing import List, Dict +from typing import List, NamedTuple import click from texttable import Texttable -from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, PACKAGE_VULNERABILITY_POLICY_ID -from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult +from cycode.cli.printers.text_printer import TextPrinter +from cycode.cli.utils.string_utils import obfuscate_text, get_position_in_line +from cycode.cli.consts import SECRET_SCAN_TYPE, SAST_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE +from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult, Document from cycode.cli.printers.base_printer import BasePrinter -SEVERITY_COLUMN = 'Severity' -LICENSE_COLUMN = 'License' -UPGRADE_COLUMN = 'Upgrade' -REPOSITORY_COLUMN = 'Repository' -CVE_COLUMN = 'CVE' - -PREVIEW_DETECTIONS_COMMON_HEADERS = [ - 'File Path', - 'Ecosystem', - 'Dependency Name', - 'Direct Dependency', - 'Development Dependency' + +class ColumnInfo(NamedTuple): + name: str + width_secret: int = 1 + width_sast: int = 1 + width_iac: int = 1 + + +VIOLATION_COLUMN = ColumnInfo(name='Violation', width_secret=2) +SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', width_secret=2) +COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA') +VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length') + +DETECTIONS_COMMON_HEADERS = [ + ColumnInfo(name='Issue Type', width_secret=2, width_iac=4, width_sast=7), + ColumnInfo(name='Rule ID', width_secret=2, width_iac=3, width_sast=2), + ColumnInfo(name='File Path', width_secret=2, width_iac=3, width_sast=3), + ColumnInfo(name='Line Number'), + ColumnInfo(name='Column Number'), ] class TablePrinter(BasePrinter): - RED_COLOR_NAME = 'red' - WHITE_COLOR_NAME = 'white' - GREEN_COLOR_NAME = 'green' - def __init__(self, context: click.Context): super().__init__(context) - self.scan_id = context.obj.get('scan_id') + self.context = context + self.scan_id: str = context.obj.get('scan_id') + self.scan_type: str = context.obj.get('scan_type') + self.show_secret: bool = context.obj.get('show_secret', False) def print_result(self, result: CliResult) -> None: - raise NotImplemented + TextPrinter(self.context).print_result(result) def print_error(self, error: CliError) -> None: - raise NotImplemented + TextPrinter(self.context).print_error(error) def print_scan_results(self, results: List[DocumentDetections]): - click.secho(f"Scan Results: (scan_id: {self.scan_id})") + click.secho(f'Scan Results: (scan_id: {self.scan_id})') if not results: - click.secho("Good job! No issues were found!!! 👏👏👏", fg=self.GREEN_COLOR_NAME) + click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) return - detections_per_detection_type_id = self._extract_detections_per_detection_type_id(results) - - self._print_detection_per_detection_type_id(detections_per_detection_type_id) + self._print_results(results) report_url = self.context.obj.get('report_url') if report_url: click.secho(f'Report URL: {report_url}') - @staticmethod - def _extract_detections_per_detection_type_id(results: List[DocumentDetections]) -> Dict[str, List[Detection]]: - detections_per_detection_type_id = defaultdict(list) + def _print_results(self, results: List[DocumentDetections]) -> None: + headers = self._get_table_headers() - for document_detection in results: - for detection in document_detection.detections: - detections_per_detection_type_id[detection.detection_type_id].append(detection) + rows = [] + for detections in results: + for detection in detections.detections: + rows.append(self._get_detection_row(detection, detections.document)) - return detections_per_detection_type_id + if rows: + self._print_table(headers, rows) - def _print_detection_per_detection_type_id( - self, detections_per_detection_type_id: Dict[str, List[Detection]] - ) -> None: - for detection_type_id in detections_per_detection_type_id: - detections = detections_per_detection_type_id[detection_type_id] - headers = self._get_table_headers() + def _print_table(self, headers: List[ColumnInfo], rows: List[List[str]]) -> None: + text_table = Texttable() + text_table.header([header.name for header in headers]) + text_table.set_cols_width(self._get_table_columns_width(headers)) - title = None - rows = [] + for row in rows: + text_table.add_row(row) - if detection_type_id == PACKAGE_VULNERABILITY_POLICY_ID: - title = "Dependencies Vulnerabilities" + click.echo(text_table.draw()) - headers = [SEVERITY_COLUMN] + headers - headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) - headers.append(CVE_COLUMN) - headers.append(UPGRADE_COLUMN) + def _is_git_repository(self) -> bool: + return self.context.obj.get('remote_url') is not None - for detection in detections: - rows.append(self._get_upgrade_package_vulnerability(detection)) - elif detection_type_id == LICENSE_COMPLIANCE_POLICY_ID: - title = "License Compliance" + def _get_table_headers(self) -> list: + headers = DETECTIONS_COMMON_HEADERS.copy() - headers.extend(PREVIEW_DETECTIONS_COMMON_HEADERS) - headers.append(LICENSE_COLUMN) + if self._is_git_repository(): + headers.insert(3, COMMIT_SHA_COLUMN) - for detection in detections: - rows.append(self._get_license(detection)) + if self.scan_type == SECRET_SCAN_TYPE: + headers.insert(3, SECRET_SHA_COLUMN) + headers.append(VIOLATION_LENGTH_COLUMN) + headers.append(VIOLATION_COLUMN) - if rows: - self._print_table_detections(detections, headers, rows, title) + return headers - def _get_table_headers(self) -> list: - if self._is_git_repository(): - return [REPOSITORY_COLUMN] + def _get_table_columns_width(self, headers: List[ColumnInfo]) -> List[int]: + header_width_size_cols = [] + for header in headers: + width_multiplier = 1 + if self.scan_type == SECRET_SCAN_TYPE: + width_multiplier = header.width_secret + elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: + width_multiplier = header.width_iac + elif self.scan_type == SAST_SCAN_TYPE: + width_multiplier = header.width_sast + + header_width_size_cols.append(len(header.name) * width_multiplier) + + return header_width_size_cols + + def _get_detection_row(self, detection: Detection, document: Document) -> List[str]: + return [ + *self._get_detection_summary_fields(detection, document.path), + *self._get_detection_code_segment_fields(detection, document), + ] - return [] + def _get_detection_summary_fields(self, detection: Detection, document_path: str) -> List[str]: + issue_type = detection.message + if self.scan_type == SECRET_SCAN_TYPE: + issue_type = detection.type - def _print_table_detections( - self, detections: List[Detection], headers: List[str], rows, title: str - ) -> None: - self._print_summary_issues(detections, title) - text_table = Texttable() - text_table.header(headers) + rows = [ + issue_type, + detection.detection_rule_id, + click.format_filename(document_path), + ] - self.set_table_width(headers, text_table) + if self.scan_type == SECRET_SCAN_TYPE: + rows.append(detection.detection_details.get('sha512', '')) - for row in rows: - text_table.add_row(row) + if self._is_git_repository(): + rows.append(detection.detection_details.get('commit_id', '')) - click.echo(text_table.draw()) + return rows - @staticmethod - def set_table_width(headers: List[str], text_table: Texttable) -> None: - header_width_size_cols = [] - for header in headers: - header_len = len(header) - if header == CVE_COLUMN: - header_width_size_cols.append(header_len * 5) - elif header == UPGRADE_COLUMN: - header_width_size_cols.append(header_len * 2) - else: - header_width_size_cols.append(header_len) - text_table.set_cols_width(header_width_size_cols) - - @staticmethod - def _print_summary_issues(detections: List, title: str) -> None: - click.echo(f'⛔ Found {len(detections)} issues of type: {click.style(title, bold=True)}') - - def _get_common_detection_fields(self, detection: Detection) -> List[str]: - row = [ - detection.detection_details.get('file_name'), - detection.detection_details.get('ecosystem'), - detection.detection_details.get('package_name'), - detection.detection_details.get('is_direct_dependency_str'), - detection.detection_details.get('is_dev_dependency_str') - ] + def _get_detection_code_segment_fields(self, detection: Detection, document: Document) -> List[str]: + detection_details = detection.detection_details - if self._is_git_repository(): - row = [detection.detection_details.get('repository_name')] + row + detection_line = detection_details.get('line_in_file', -1) + if self.scan_type == SECRET_SCAN_TYPE: + detection_line = detection_details.get('line', -1) - return row + detection_position = get_position_in_line(document.content, detection_details.get('start_position', -1)) + violation_length = detection_details.get('length', -1) - def _is_git_repository(self) -> bool: - return self.context.obj.get("remote_url") is not None - - def _get_upgrade_package_vulnerability(self, detection: Detection) -> List[str]: - alert = detection.detection_details.get('alert') - row = [ - detection.detection_details.get('advisory_severity'), - *self._get_common_detection_fields(detection), - detection.detection_details.get('vulnerability_id') + rows = [ + detection_line, + detection_position, ] - upgrade = '' - if alert.get("first_patched_version"): - upgrade = f'{alert.get("vulnerable_requirements")} -> {alert.get("first_patched_version")}' - row.append(upgrade) + if self.scan_type == SECRET_SCAN_TYPE: + rows.append(f'{violation_length} chars') + + violation = '' + + file_content_lines = document.content.splitlines() + if detection_line < len(file_content_lines): + line = file_content_lines[detection_line] + violation = line[detection_position: detection_position + violation_length] + + if not self.show_secret: + violation = obfuscate_text(violation) - return row + rows.append(violation) - def _get_license(self, detection: Detection) -> List[str]: - row = self._get_common_detection_fields(detection) - row.append(f'{detection.detection_details.get("license")}') - return row + return rows diff --git a/cycode/cli/printers/text_printer.py b/cycode/cli/printers/text_printer.py index bb7091c7..c3a655d6 100644 --- a/cycode/cli/printers/text_printer.py +++ b/cycode/cli/printers/text_printer.py @@ -7,14 +7,10 @@ from cycode.cli.models import DocumentDetections, Detection, Document, CliResult, CliError from cycode.cli.config import config from cycode.cli.consts import SECRET_SCAN_TYPE, COMMIT_RANGE_BASED_COMMAND_SCAN_TYPES -from cycode.cli.utils.string_utils import obfuscate_text +from cycode.cli.utils.string_utils import obfuscate_text, get_position_in_line class TextPrinter(BasePrinter): - RED_COLOR_NAME = 'red' - WHITE_COLOR_NAME = 'white' - GREEN_COLOR_NAME = 'green' - def __init__(self, context: click.Context): super().__init__(context) self.scan_id: str = context.obj.get('scan_id') @@ -132,9 +128,6 @@ def _get_line_color(self, line: str, is_git_diff: bool) -> str: return self.WHITE_COLOR_NAME - def _get_position_in_line(self, text: str, position: int) -> int: - return position - text.rfind('\n', 0, position) - 1 - def _get_line_number_style(self, line_number: int): return f'{click.style(str(line_number), fg=self.WHITE_COLOR_NAME, bold=False)} ' \ f'{click.style("|", fg=self.RED_COLOR_NAME, bold=False)}' @@ -158,7 +151,7 @@ def _print_detection_from_file(self, detection: Detection, document: Document, c file_content = document.content file_lines = file_content.splitlines() start_line = self._get_code_segment_start_line(detection_line, code_segment_size) - detection_position_in_line = self._get_position_in_line(file_content, detection_position) + detection_position_in_line = get_position_in_line(file_content, detection_position) click.echo() for i in range(code_segment_size): @@ -182,7 +175,7 @@ def _print_detection_from_git_diff(self, detection: Detection, document: Documen git_diff_content = document.content git_diff_lines = git_diff_content.splitlines() detection_line = git_diff_lines[detection_line_number] - detection_position_in_line = self._get_position_in_line(git_diff_content, detection_position) + detection_position_in_line = get_position_in_line(git_diff_content, detection_position) click.echo() self._print_detection_line(document, detection_line, detection_line_number_in_original_file, diff --git a/cycode/cli/utils/string_utils.py b/cycode/cli/utils/string_utils.py index 6d29e3ef..0e7d0c23 100644 --- a/cycode/cli/utils/string_utils.py +++ b/cycode/cli/utils/string_utils.py @@ -43,3 +43,7 @@ def generate_random_string(string_len: int): # letters, digits, and symbols characters = string.ascii_letters + string.digits + string.punctuation return ''.join(random.choice(characters) for _ in range(string_len)) + + +def get_position_in_line(text: str, position: int) -> int: + return position - text.rfind('\n', 0, position) - 1 From f3354d36c521410c64c05829f7f227dc7f5a2113 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Mon, 12 Jun 2023 11:58:55 +0200 Subject: [PATCH 2/6] New table management --- cycode/cli/printers/base_table_printer.py | 112 +++++++++++++ cycode/cli/printers/sca_table_printer.py | 32 +--- cycode/cli/printers/table_printer.py | 188 +++++++--------------- 3 files changed, 175 insertions(+), 157 deletions(-) create mode 100644 cycode/cli/printers/base_table_printer.py diff --git a/cycode/cli/printers/base_table_printer.py b/cycode/cli/printers/base_table_printer.py new file mode 100644 index 00000000..1161dfc3 --- /dev/null +++ b/cycode/cli/printers/base_table_printer.py @@ -0,0 +1,112 @@ +import abc +from typing import List, NamedTuple, Dict, Optional + +import click +from texttable import Texttable + +from cycode.cli.consts import SECRET_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE, SAST_SCAN_TYPE +from cycode.cli.printers.text_printer import TextPrinter +from cycode.cli.models import DocumentDetections, CliError, CliResult +from cycode.cli.printers.base_printer import BasePrinter + + +class ColumnInfo(NamedTuple): + name: str + index: int # lower index means left column + width_secret: int = 1 + width_sast: int = 1 + width_iac: int = 1 + + +class Table: + """Helper class to manage columns and their values in the right order and only if the column should be presented.""" + + def __init__(self, scan_type: str, columns_info: Optional[List[ColumnInfo]] = None): + self.scan_type = scan_type + + self._columns: Dict[ColumnInfo, List[str]] = dict() + if columns_info: + self._columns: Dict[ColumnInfo, List[str]] = {columns: list() for columns in columns_info} + + def add(self, column: ColumnInfo): + self._columns[column] = list() + + def set(self, column: ColumnInfo, value: str): + # we pash values only for existing columns what were added before + if column in self._columns: + self._columns[column].append(value) + + def _get_ordered_columns(self) -> List[ColumnInfo]: + # we are sorting columns by index to make sure that columns will be printed in the right order + return sorted(self._columns, key=lambda column_info: column_info.index) + + def get_columns_info(self) -> List[ColumnInfo]: + return self._get_ordered_columns() + + def get_headers(self) -> List[str]: + return [header.name for header in self._get_ordered_columns()] + + def get_rows(self) -> List[str]: + ordered_values = [self._columns[column_info] for column_info in self._get_ordered_columns()] + return list(zip(*ordered_values)) + + def _get_table_columns_width(self) -> List[int]: + header_width_size_cols = [] + for header in self.get_columns_info(): + width_multiplier = 1 + if self.scan_type == SECRET_SCAN_TYPE: + width_multiplier = header.width_secret + elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: + width_multiplier = header.width_iac + elif self.scan_type == SAST_SCAN_TYPE: + width_multiplier = header.width_sast + + header_width_size_cols.append(len(header.name) * width_multiplier) + + return header_width_size_cols + + def get_table(self, max_width: int = 80) -> Texttable: + table = Texttable(max_width) + table.header(self.get_headers()) + + for row in self.get_rows(): + table.add_row(row) + + table.set_cols_width(self._get_table_columns_width()) + + return table + + +class BaseTablePrinter(BasePrinter, abc.ABC): + def __init__(self, context: click.Context): + super().__init__(context) + self.context = context + self.scan_id: str = context.obj.get('scan_id') + self.scan_type: str = context.obj.get('scan_type') + self.show_secret: bool = context.obj.get('show_secret', False) + + def print_result(self, result: CliResult) -> None: + TextPrinter(self.context).print_result(result) + + def print_error(self, error: CliError) -> None: + TextPrinter(self.context).print_error(error) + + def print_scan_results(self, results: List[DocumentDetections]): + click.secho(f'Scan Results: (scan_id: {self.scan_id})') + + if not results: + click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) + return + + self._print_results(results) + + report_url = self.context.obj.get('report_url') + if report_url: + click.secho(f'Report URL: {report_url}') + + def _is_git_repository(self) -> bool: + return self.context.obj.get('remote_url') is not None + + @abc.abstractmethod + def _print_results(self, results: List[DocumentDetections]) -> None: + raise NotImplemented diff --git a/cycode/cli/printers/sca_table_printer.py b/cycode/cli/printers/sca_table_printer.py index dcd0824d..34130607 100644 --- a/cycode/cli/printers/sca_table_printer.py +++ b/cycode/cli/printers/sca_table_printer.py @@ -5,8 +5,8 @@ from texttable import Texttable from cycode.cli.consts import LICENSE_COMPLIANCE_POLICY_ID, PACKAGE_VULNERABILITY_POLICY_ID -from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult -from cycode.cli.printers.base_printer import BasePrinter +from cycode.cli.models import DocumentDetections, Detection +from cycode.cli.printers.base_table_printer import BaseTablePrinter SEVERITY_COLUMN = 'Severity' LICENSE_COLUMN = 'License' @@ -23,32 +23,11 @@ ] -class SCATablePrinter(BasePrinter): - def __init__(self, context: click.Context): - super().__init__(context) - self.scan_id = context.obj.get('scan_id') - - def print_result(self, result: CliResult) -> None: - raise NotImplemented - - def print_error(self, error: CliError) -> None: - raise NotImplemented - - def print_scan_results(self, results: List[DocumentDetections]): - click.secho(f"Scan Results: (scan_id: {self.scan_id})") - - if not results: - click.secho("Good job! No issues were found!!! 👏👏👏", fg=self.GREEN_COLOR_NAME) - return - +class SCATablePrinter(BaseTablePrinter): + def _print_results(self, results: List[DocumentDetections]) -> None: detections_per_detection_type_id = self._extract_detections_per_detection_type_id(results) - self._print_detection_per_detection_type_id(detections_per_detection_type_id) - report_url = self.context.obj.get('report_url') - if report_url: - click.secho(f'Report URL: {report_url}') - @staticmethod def _extract_detections_per_detection_type_id(results: List[DocumentDetections]) -> Dict[str, List[Detection]]: detections_per_detection_type_id = defaultdict(list) @@ -142,9 +121,6 @@ def _get_common_detection_fields(self, detection: Detection) -> List[str]: return row - def _is_git_repository(self) -> bool: - return self.context.obj.get("remote_url") is not None - def _get_upgrade_package_vulnerability(self, detection: Detection) -> List[str]: alert = detection.detection_details.get('alert') row = [ diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index 53a611e7..29fe4fe0 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -1,168 +1,98 @@ -from typing import List, NamedTuple +from typing import List import click -from texttable import Texttable -from cycode.cli.printers.text_printer import TextPrinter +from cycode.cli.printers.base_table_printer import BaseTablePrinter, ColumnInfo, Table from cycode.cli.utils.string_utils import obfuscate_text, get_position_in_line -from cycode.cli.consts import SECRET_SCAN_TYPE, SAST_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE -from cycode.cli.models import DocumentDetections, Detection, CliError, CliResult, Document -from cycode.cli.printers.base_printer import BasePrinter +from cycode.cli.consts import SECRET_SCAN_TYPE +from cycode.cli.models import DocumentDetections, Detection, Document +# we are using indexes like 10 20 30 to have space between for future columns inserts -class ColumnInfo(NamedTuple): - name: str - width_secret: int = 1 - width_sast: int = 1 - width_iac: int = 1 +# optional +SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=31, width_secret=2) +COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=32) +VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=51) +VIOLATION_COLUMN = ColumnInfo(name='Violation', index=52, width_secret=2) +# required +ISSUE_TYPE_COLUMN = ColumnInfo(name='Issue Type', index=10, width_secret=2, width_iac=4, width_sast=7) +RULE_ID_COLUMN = ColumnInfo(name='Rule ID', index=20, width_secret=2, width_iac=3, width_sast=2) +FILE_PATH_COLUMN = ColumnInfo(name='File Path', index=30, width_secret=2, width_iac=3, width_sast=3) +LINE_NUMBER_COLUMN = ColumnInfo(name='Line Number', index=40) +COLUMN_NUMBER_COLUMN = ColumnInfo(name='Column Number', index=50) -VIOLATION_COLUMN = ColumnInfo(name='Violation', width_secret=2) -SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', width_secret=2) -COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA') -VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length') - -DETECTIONS_COMMON_HEADERS = [ - ColumnInfo(name='Issue Type', width_secret=2, width_iac=4, width_sast=7), - ColumnInfo(name='Rule ID', width_secret=2, width_iac=3, width_sast=2), - ColumnInfo(name='File Path', width_secret=2, width_iac=3, width_sast=3), - ColumnInfo(name='Line Number'), - ColumnInfo(name='Column Number'), -] - - -class TablePrinter(BasePrinter): - def __init__(self, context: click.Context): - super().__init__(context) - self.context = context - self.scan_id: str = context.obj.get('scan_id') - self.scan_type: str = context.obj.get('scan_type') - self.show_secret: bool = context.obj.get('show_secret', False) - - def print_result(self, result: CliResult) -> None: - TextPrinter(self.context).print_result(result) - - def print_error(self, error: CliError) -> None: - TextPrinter(self.context).print_error(error) - - def print_scan_results(self, results: List[DocumentDetections]): - click.secho(f'Scan Results: (scan_id: {self.scan_id})') - - if not results: - click.secho('Good job! No issues were found!!! 👏👏👏', fg=self.GREEN_COLOR_NAME) - return - - self._print_results(results) - - report_url = self.context.obj.get('report_url') - if report_url: - click.secho(f'Report URL: {report_url}') - +class TablePrinter(BaseTablePrinter): def _print_results(self, results: List[DocumentDetections]) -> None: - headers = self._get_table_headers() + table = self._get_table() - rows = [] for detections in results: for detection in detections.detections: - rows.append(self._get_detection_row(detection, detections.document)) + self._enrich_table_with_values(table, detection, detections.document) - if rows: - self._print_table(headers, rows) + click.echo(table.get_table().draw()) - def _print_table(self, headers: List[ColumnInfo], rows: List[List[str]]) -> None: - text_table = Texttable() - text_table.header([header.name for header in headers]) - text_table.set_cols_width(self._get_table_columns_width(headers)) + def _get_table(self) -> Table: + """Returns map (under the hood) of table headers and their values""" + columns = Table(self.scan_type) - for row in rows: - text_table.add_row(row) + columns.add(ISSUE_TYPE_COLUMN) + columns.add(RULE_ID_COLUMN) + columns.add(FILE_PATH_COLUMN) + columns.add(LINE_NUMBER_COLUMN) + columns.add(COLUMN_NUMBER_COLUMN) - click.echo(text_table.draw()) + if self._is_git_repository(): + columns.add(COMMIT_SHA_COLUMN) - def _is_git_repository(self) -> bool: - return self.context.obj.get('remote_url') is not None + if self.scan_type == SECRET_SCAN_TYPE: + columns.add(SECRET_SHA_COLUMN) + columns.add(VIOLATION_LENGTH_COLUMN) + columns.add(VIOLATION_COLUMN) - def _get_table_headers(self) -> list: - headers = DETECTIONS_COMMON_HEADERS.copy() + return columns - if self._is_git_repository(): - headers.insert(3, COMMIT_SHA_COLUMN) + def _enrich_table_with_values(self, table: Table, detection: Detection, document: Document) -> None: + self._enrich_table_with_detection_summary_values(table, detection, document) + self._enrich_table_with_detection_code_segment_values(table, detection, document) - if self.scan_type == SECRET_SCAN_TYPE: - headers.insert(3, SECRET_SHA_COLUMN) - headers.append(VIOLATION_LENGTH_COLUMN) - headers.append(VIOLATION_COLUMN) - - return headers - - def _get_table_columns_width(self, headers: List[ColumnInfo]) -> List[int]: - header_width_size_cols = [] - for header in headers: - width_multiplier = 1 - if self.scan_type == SECRET_SCAN_TYPE: - width_multiplier = header.width_secret - elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: - width_multiplier = header.width_iac - elif self.scan_type == SAST_SCAN_TYPE: - width_multiplier = header.width_sast - - header_width_size_cols.append(len(header.name) * width_multiplier) - - return header_width_size_cols - - def _get_detection_row(self, detection: Detection, document: Document) -> List[str]: - return [ - *self._get_detection_summary_fields(detection, document.path), - *self._get_detection_code_segment_fields(detection, document), - ] - - def _get_detection_summary_fields(self, detection: Detection, document_path: str) -> List[str]: + def _enrich_table_with_detection_summary_values( + self, table: Table, detection: Detection, document: Document + ) -> None: issue_type = detection.message if self.scan_type == SECRET_SCAN_TYPE: issue_type = detection.type - rows = [ - issue_type, - detection.detection_rule_id, - click.format_filename(document_path), - ] + table.set(ISSUE_TYPE_COLUMN, issue_type) + table.set(RULE_ID_COLUMN, detection.detection_rule_id) + table.set(FILE_PATH_COLUMN, click.format_filename(document.path)) + table.set(SECRET_SHA_COLUMN, detection.detection_details.get('sha512', '')) + table.set(COMMIT_SHA_COLUMN, detection.detection_details.get('commit_id', '')) - if self.scan_type == SECRET_SCAN_TYPE: - rows.append(detection.detection_details.get('sha512', '')) - - if self._is_git_repository(): - rows.append(detection.detection_details.get('commit_id', '')) - - return rows - - def _get_detection_code_segment_fields(self, detection: Detection, document: Document) -> List[str]: + def _enrich_table_with_detection_code_segment_values( + self, table: Table, detection: Detection, document: Document + ) -> None: detection_details = detection.detection_details detection_line = detection_details.get('line_in_file', -1) if self.scan_type == SECRET_SCAN_TYPE: detection_line = detection_details.get('line', -1) - detection_position = get_position_in_line(document.content, detection_details.get('start_position', -1)) + detection_column = get_position_in_line(document.content, detection_details.get('start_position', -1)) violation_length = detection_details.get('length', -1) - rows = [ - detection_line, - detection_position, - ] + table.set(LINE_NUMBER_COLUMN, str(detection_line)) + table.set(COLUMN_NUMBER_COLUMN, str(detection_column)) - if self.scan_type == SECRET_SCAN_TYPE: - rows.append(f'{violation_length} chars') + table.set(VIOLATION_LENGTH_COLUMN, f'{violation_length} chars') - violation = '' - - file_content_lines = document.content.splitlines() - if detection_line < len(file_content_lines): - line = file_content_lines[detection_line] - violation = line[detection_position: detection_position + violation_length] + violation = '' + file_content_lines = document.content.splitlines() + if detection_line < len(file_content_lines): + line = file_content_lines[detection_line] + violation = line[detection_column: detection_column + violation_length] if not self.show_secret: violation = obfuscate_text(violation) - rows.append(violation) - - return rows + table.set(VIOLATION_COLUMN, violation) From 27c7d89c831f18b4923ed9378b2732ed6f039cd5 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Mon, 12 Jun 2023 19:48:32 +0200 Subject: [PATCH 3/6] fix naming --- cycode/cli/printers/base_table_printer.py | 4 +- cycode/cli/printers/table_printer.py | 47 +++++++++++------------ 2 files changed, 24 insertions(+), 27 deletions(-) diff --git a/cycode/cli/printers/base_table_printer.py b/cycode/cli/printers/base_table_printer.py index 1161dfc3..e4ba4598 100644 --- a/cycode/cli/printers/base_table_printer.py +++ b/cycode/cli/printers/base_table_printer.py @@ -32,7 +32,7 @@ def add(self, column: ColumnInfo): self._columns[column] = list() def set(self, column: ColumnInfo, value: str): - # we pash values only for existing columns what were added before + # we push values only for existing columns what were added before if column in self._columns: self._columns[column].append(value) @@ -109,4 +109,4 @@ def _is_git_repository(self) -> bool: @abc.abstractmethod def _print_results(self, results: List[DocumentDetections]) -> None: - raise NotImplemented + raise NotImplementedError diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index 29fe4fe0..1e225190 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -9,48 +9,47 @@ # we are using indexes like 10 20 30 to have space between for future columns inserts -# optional -SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=31, width_secret=2) -COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=32) -VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=51) -VIOLATION_COLUMN = ColumnInfo(name='Violation', index=52, width_secret=2) # required ISSUE_TYPE_COLUMN = ColumnInfo(name='Issue Type', index=10, width_secret=2, width_iac=4, width_sast=7) RULE_ID_COLUMN = ColumnInfo(name='Rule ID', index=20, width_secret=2, width_iac=3, width_sast=2) FILE_PATH_COLUMN = ColumnInfo(name='File Path', index=30, width_secret=2, width_iac=3, width_sast=3) LINE_NUMBER_COLUMN = ColumnInfo(name='Line Number', index=40) COLUMN_NUMBER_COLUMN = ColumnInfo(name='Column Number', index=50) +# optional +SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=31, width_secret=2) +COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=32) +VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=51) +VIOLATION_COLUMN = ColumnInfo(name='Violation', index=52, width_secret=2) class TablePrinter(BaseTablePrinter): def _print_results(self, results: List[DocumentDetections]) -> None: table = self._get_table() - for detections in results: - for detection in detections.detections: - self._enrich_table_with_values(table, detection, detections.document) + for result in results: + for detection in result.detections: + self._enrich_table_with_values(table, detection, result.document) click.echo(table.get_table().draw()) def _get_table(self) -> Table: - """Returns map (under the hood) of table headers and their values""" - columns = Table(self.scan_type) + table = Table(self.scan_type) - columns.add(ISSUE_TYPE_COLUMN) - columns.add(RULE_ID_COLUMN) - columns.add(FILE_PATH_COLUMN) - columns.add(LINE_NUMBER_COLUMN) - columns.add(COLUMN_NUMBER_COLUMN) + table.add(ISSUE_TYPE_COLUMN) + table.add(RULE_ID_COLUMN) + table.add(FILE_PATH_COLUMN) + table.add(LINE_NUMBER_COLUMN) + table.add(COLUMN_NUMBER_COLUMN) if self._is_git_repository(): - columns.add(COMMIT_SHA_COLUMN) + table.add(COMMIT_SHA_COLUMN) if self.scan_type == SECRET_SCAN_TYPE: - columns.add(SECRET_SHA_COLUMN) - columns.add(VIOLATION_LENGTH_COLUMN) - columns.add(VIOLATION_COLUMN) + table.add(SECRET_SHA_COLUMN) + table.add(VIOLATION_LENGTH_COLUMN) + table.add(VIOLATION_COLUMN) - return columns + return table def _enrich_table_with_values(self, table: Table, detection: Detection, document: Document) -> None: self._enrich_table_with_detection_summary_values(table, detection, document) @@ -81,11 +80,6 @@ def _enrich_table_with_detection_code_segment_values( detection_column = get_position_in_line(document.content, detection_details.get('start_position', -1)) violation_length = detection_details.get('length', -1) - table.set(LINE_NUMBER_COLUMN, str(detection_line)) - table.set(COLUMN_NUMBER_COLUMN, str(detection_column)) - - table.set(VIOLATION_LENGTH_COLUMN, f'{violation_length} chars') - violation = '' file_content_lines = document.content.splitlines() if detection_line < len(file_content_lines): @@ -95,4 +89,7 @@ def _enrich_table_with_detection_code_segment_values( if not self.show_secret: violation = obfuscate_text(violation) + table.set(LINE_NUMBER_COLUMN, str(detection_line)) + table.set(COLUMN_NUMBER_COLUMN, str(detection_column)) + table.set(VIOLATION_LENGTH_COLUMN, f'{violation_length} chars') table.set(VIOLATION_COLUMN, violation) From 40d1c699c594f5ee19b64d595d012a0839f9da26 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Tue, 13 Jun 2023 10:17:56 +0200 Subject: [PATCH 4/6] code refactoring --- cycode/cli/printers/base_table_printer.py | 71 +---------------------- cycode/cli/printers/column_info.py | 9 +++ cycode/cli/printers/table.py | 66 +++++++++++++++++++++ cycode/cli/printers/table_printer.py | 26 ++++----- 4 files changed, 88 insertions(+), 84 deletions(-) create mode 100644 cycode/cli/printers/column_info.py create mode 100644 cycode/cli/printers/table.py diff --git a/cycode/cli/printers/base_table_printer.py b/cycode/cli/printers/base_table_printer.py index e4ba4598..112a1d4a 100644 --- a/cycode/cli/printers/base_table_printer.py +++ b/cycode/cli/printers/base_table_printer.py @@ -1,82 +1,13 @@ import abc -from typing import List, NamedTuple, Dict, Optional +from typing import List import click -from texttable import Texttable -from cycode.cli.consts import SECRET_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE, SAST_SCAN_TYPE from cycode.cli.printers.text_printer import TextPrinter from cycode.cli.models import DocumentDetections, CliError, CliResult from cycode.cli.printers.base_printer import BasePrinter -class ColumnInfo(NamedTuple): - name: str - index: int # lower index means left column - width_secret: int = 1 - width_sast: int = 1 - width_iac: int = 1 - - -class Table: - """Helper class to manage columns and their values in the right order and only if the column should be presented.""" - - def __init__(self, scan_type: str, columns_info: Optional[List[ColumnInfo]] = None): - self.scan_type = scan_type - - self._columns: Dict[ColumnInfo, List[str]] = dict() - if columns_info: - self._columns: Dict[ColumnInfo, List[str]] = {columns: list() for columns in columns_info} - - def add(self, column: ColumnInfo): - self._columns[column] = list() - - def set(self, column: ColumnInfo, value: str): - # we push values only for existing columns what were added before - if column in self._columns: - self._columns[column].append(value) - - def _get_ordered_columns(self) -> List[ColumnInfo]: - # we are sorting columns by index to make sure that columns will be printed in the right order - return sorted(self._columns, key=lambda column_info: column_info.index) - - def get_columns_info(self) -> List[ColumnInfo]: - return self._get_ordered_columns() - - def get_headers(self) -> List[str]: - return [header.name for header in self._get_ordered_columns()] - - def get_rows(self) -> List[str]: - ordered_values = [self._columns[column_info] for column_info in self._get_ordered_columns()] - return list(zip(*ordered_values)) - - def _get_table_columns_width(self) -> List[int]: - header_width_size_cols = [] - for header in self.get_columns_info(): - width_multiplier = 1 - if self.scan_type == SECRET_SCAN_TYPE: - width_multiplier = header.width_secret - elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: - width_multiplier = header.width_iac - elif self.scan_type == SAST_SCAN_TYPE: - width_multiplier = header.width_sast - - header_width_size_cols.append(len(header.name) * width_multiplier) - - return header_width_size_cols - - def get_table(self, max_width: int = 80) -> Texttable: - table = Texttable(max_width) - table.header(self.get_headers()) - - for row in self.get_rows(): - table.add_row(row) - - table.set_cols_width(self._get_table_columns_width()) - - return table - - class BaseTablePrinter(BasePrinter, abc.ABC): def __init__(self, context: click.Context): super().__init__(context) diff --git a/cycode/cli/printers/column_info.py b/cycode/cli/printers/column_info.py new file mode 100644 index 00000000..b44f2ece --- /dev/null +++ b/cycode/cli/printers/column_info.py @@ -0,0 +1,9 @@ +from typing import NamedTuple + + +class ColumnInfo(NamedTuple): + name: str + index: int # Represents the order of the columns, starting from the left + width_secret: int = 1 + width_sast: int = 1 + width_iac: int = 1 diff --git a/cycode/cli/printers/table.py b/cycode/cli/printers/table.py new file mode 100644 index 00000000..f7bb6a79 --- /dev/null +++ b/cycode/cli/printers/table.py @@ -0,0 +1,66 @@ +from typing import List, Dict, Optional, TYPE_CHECKING +from texttable import Texttable + +from cycode.cli.consts import SECRET_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE, SAST_SCAN_TYPE + +if TYPE_CHECKING: + from cycode.cli.printers.column_info import ColumnInfo + + +class Table: + """Helper class to manage columns and their values in the right order and only if the column should be presented.""" + + def __init__(self, scan_type: str, column_infos: Optional[List['ColumnInfo']] = None): + self.scan_type = scan_type + + self._columns: Dict['ColumnInfo', List[str]] = dict() + if column_infos: + self._columns: Dict['ColumnInfo', List[str]] = {columns: list() for columns in column_infos} + + def add(self, column: 'ColumnInfo'): + self._columns[column] = list() + + def set(self, column: 'ColumnInfo', value: str): + # we push values only for existing columns what were added before + if column in self._columns: + self._columns[column].append(value) + + def _get_ordered_columns(self) -> List['ColumnInfo']: + # we are sorting columns by index to make sure that columns will be printed in the right order + return sorted(self._columns, key=lambda column_info: column_info.index) + + def get_columns_info(self) -> List['ColumnInfo']: + return self._get_ordered_columns() + + def get_headers(self) -> List[str]: + return [header.name for header in self._get_ordered_columns()] + + def get_rows(self) -> List[str]: + column_values = [self._columns[column_info] for column_info in self._get_ordered_columns()] + return list(zip(*column_values)) + + def _get_table_columns_width(self) -> List[int]: + header_width_size_cols = [] + for header in self.get_columns_info(): + width_multiplier = 1 + if self.scan_type == SECRET_SCAN_TYPE: + width_multiplier = header.width_secret + elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: + width_multiplier = header.width_iac + elif self.scan_type == SAST_SCAN_TYPE: + width_multiplier = header.width_sast + + header_width_size_cols.append(len(header.name) * width_multiplier) + + return header_width_size_cols + + def get_table(self, max_width: int = 80) -> Texttable: + table = Texttable(max_width) + table.header(self.get_headers()) + + for row in self.get_rows(): + table.add_row(row) + + table.set_cols_width(self._get_table_columns_width()) + + return table diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index 1e225190..b7b1cf4a 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -2,24 +2,22 @@ import click -from cycode.cli.printers.base_table_printer import BaseTablePrinter, ColumnInfo, Table +from cycode.cli.printers.base_table_printer import BaseTablePrinter +from cycode.cli.printers.column_info import ColumnInfo +from cycode.cli.printers.table import Table from cycode.cli.utils.string_utils import obfuscate_text, get_position_in_line from cycode.cli.consts import SECRET_SCAN_TYPE from cycode.cli.models import DocumentDetections, Detection, Document -# we are using indexes like 10 20 30 to have space between for future columns inserts - -# required -ISSUE_TYPE_COLUMN = ColumnInfo(name='Issue Type', index=10, width_secret=2, width_iac=4, width_sast=7) -RULE_ID_COLUMN = ColumnInfo(name='Rule ID', index=20, width_secret=2, width_iac=3, width_sast=2) -FILE_PATH_COLUMN = ColumnInfo(name='File Path', index=30, width_secret=2, width_iac=3, width_sast=3) -LINE_NUMBER_COLUMN = ColumnInfo(name='Line Number', index=40) -COLUMN_NUMBER_COLUMN = ColumnInfo(name='Column Number', index=50) -# optional -SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=31, width_secret=2) -COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=32) -VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=51) -VIOLATION_COLUMN = ColumnInfo(name='Violation', index=52, width_secret=2) +ISSUE_TYPE_COLUMN = ColumnInfo(name='Issue Type', index=1, width_secret=2, width_iac=4, width_sast=7) +RULE_ID_COLUMN = ColumnInfo(name='Rule ID', index=2, width_secret=2, width_iac=3, width_sast=2) +FILE_PATH_COLUMN = ColumnInfo(name='File Path', index=3, width_secret=2, width_iac=3, width_sast=3) +SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=3, width_secret=2) +COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=4) +LINE_NUMBER_COLUMN = ColumnInfo(name='Line Number', index=5) +COLUMN_NUMBER_COLUMN = ColumnInfo(name='Column Number', index=6) +VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=7) +VIOLATION_COLUMN = ColumnInfo(name='Violation', index=8, width_secret=2) class TablePrinter(BaseTablePrinter): From 1d4dcac416c98c0e72c10d06461e65e60ae7b92f Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Tue, 13 Jun 2023 11:44:14 +0200 Subject: [PATCH 5/6] rework management of column's widths --- cycode/cli/printers/column_info.py | 9 ------ cycode/cli/printers/table.py | 31 ++++++++----------- cycode/cli/printers/table_models.py | 20 ++++++++++++ cycode/cli/printers/table_printer.py | 46 ++++++++++++++++++++-------- 4 files changed, 67 insertions(+), 39 deletions(-) delete mode 100644 cycode/cli/printers/column_info.py create mode 100644 cycode/cli/printers/table_models.py diff --git a/cycode/cli/printers/column_info.py b/cycode/cli/printers/column_info.py deleted file mode 100644 index b44f2ece..00000000 --- a/cycode/cli/printers/column_info.py +++ /dev/null @@ -1,9 +0,0 @@ -from typing import NamedTuple - - -class ColumnInfo(NamedTuple): - name: str - index: int # Represents the order of the columns, starting from the left - width_secret: int = 1 - width_sast: int = 1 - width_iac: int = 1 diff --git a/cycode/cli/printers/table.py b/cycode/cli/printers/table.py index f7bb6a79..3677ec05 100644 --- a/cycode/cli/printers/table.py +++ b/cycode/cli/printers/table.py @@ -1,26 +1,24 @@ from typing import List, Dict, Optional, TYPE_CHECKING from texttable import Texttable -from cycode.cli.consts import SECRET_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE, SAST_SCAN_TYPE - if TYPE_CHECKING: - from cycode.cli.printers.column_info import ColumnInfo + from cycode.cli.printers.table_models import ColumnInfo, ColumnWidths class Table: """Helper class to manage columns and their values in the right order and only if the column should be presented.""" - def __init__(self, scan_type: str, column_infos: Optional[List['ColumnInfo']] = None): - self.scan_type = scan_type + def __init__(self, column_infos: Optional[List['ColumnInfo']] = None): + self._column_widths = None self._columns: Dict['ColumnInfo', List[str]] = dict() if column_infos: self._columns: Dict['ColumnInfo', List[str]] = {columns: list() for columns in column_infos} - def add(self, column: 'ColumnInfo'): + def add(self, column: 'ColumnInfo') -> None: self._columns[column] = list() - def set(self, column: 'ColumnInfo', value: str): + def set(self, column: 'ColumnInfo', value: str) -> None: # we push values only for existing columns what were added before if column in self._columns: self._columns[column].append(value) @@ -39,20 +37,16 @@ def get_rows(self) -> List[str]: column_values = [self._columns[column_info] for column_info in self._get_ordered_columns()] return list(zip(*column_values)) - def _get_table_columns_width(self) -> List[int]: - header_width_size_cols = [] + def set_cols_width(self, column_widths: 'ColumnWidths') -> None: + header_width_size = [] for header in self.get_columns_info(): width_multiplier = 1 - if self.scan_type == SECRET_SCAN_TYPE: - width_multiplier = header.width_secret - elif self.scan_type == INFRA_CONFIGURATION_SCAN_TYPE: - width_multiplier = header.width_iac - elif self.scan_type == SAST_SCAN_TYPE: - width_multiplier = header.width_sast + if header in column_widths: + width_multiplier = column_widths[header] - header_width_size_cols.append(len(header.name) * width_multiplier) + header_width_size.append(len(header.name) * width_multiplier) - return header_width_size_cols + self._column_widths = header_width_size def get_table(self, max_width: int = 80) -> Texttable: table = Texttable(max_width) @@ -61,6 +55,7 @@ def get_table(self, max_width: int = 80) -> Texttable: for row in self.get_rows(): table.add_row(row) - table.set_cols_width(self._get_table_columns_width()) + if self._column_widths: + table.set_cols_width(self._column_widths) return table diff --git a/cycode/cli/printers/table_models.py b/cycode/cli/printers/table_models.py new file mode 100644 index 00000000..34859e06 --- /dev/null +++ b/cycode/cli/printers/table_models.py @@ -0,0 +1,20 @@ +from typing import NamedTuple, Dict + + +class ColumnInfoBuilder: + _index = 0 + + @staticmethod + def build(name: str) -> 'ColumnInfo': + column_info = ColumnInfo(name, ColumnInfoBuilder._index) + ColumnInfoBuilder._index += 1 + return column_info + + +class ColumnInfo(NamedTuple): + name: str + index: int # Represents the order of the columns, starting from the left + + +ColumnWidths = Dict[ColumnInfo, int] +ColumnWidthsConfig = Dict[str, ColumnWidths] diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index b7b1cf4a..801c9c09 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -3,26 +3,48 @@ import click from cycode.cli.printers.base_table_printer import BaseTablePrinter -from cycode.cli.printers.column_info import ColumnInfo +from cycode.cli.printers.table_models import ColumnInfoBuilder, ColumnWidthsConfig from cycode.cli.printers.table import Table from cycode.cli.utils.string_utils import obfuscate_text, get_position_in_line -from cycode.cli.consts import SECRET_SCAN_TYPE +from cycode.cli.consts import SECRET_SCAN_TYPE, INFRA_CONFIGURATION_SCAN_TYPE, SAST_SCAN_TYPE from cycode.cli.models import DocumentDetections, Detection, Document -ISSUE_TYPE_COLUMN = ColumnInfo(name='Issue Type', index=1, width_secret=2, width_iac=4, width_sast=7) -RULE_ID_COLUMN = ColumnInfo(name='Rule ID', index=2, width_secret=2, width_iac=3, width_sast=2) -FILE_PATH_COLUMN = ColumnInfo(name='File Path', index=3, width_secret=2, width_iac=3, width_sast=3) -SECRET_SHA_COLUMN = ColumnInfo(name='Secret SHA', index=3, width_secret=2) -COMMIT_SHA_COLUMN = ColumnInfo(name='Commit SHA', index=4) -LINE_NUMBER_COLUMN = ColumnInfo(name='Line Number', index=5) -COLUMN_NUMBER_COLUMN = ColumnInfo(name='Column Number', index=6) -VIOLATION_LENGTH_COLUMN = ColumnInfo(name='Violation Length', index=7) -VIOLATION_COLUMN = ColumnInfo(name='Violation', index=8, width_secret=2) +# Creation must have strict order. Represents the order of the columns in the table (from left to right) +ISSUE_TYPE_COLUMN = ColumnInfoBuilder.build(name='Issue Type') +RULE_ID_COLUMN = ColumnInfoBuilder.build(name='Rule ID') +FILE_PATH_COLUMN = ColumnInfoBuilder.build(name='File Path') +SECRET_SHA_COLUMN = ColumnInfoBuilder.build(name='Secret SHA') +COMMIT_SHA_COLUMN = ColumnInfoBuilder.build(name='Commit SHA') +LINE_NUMBER_COLUMN = ColumnInfoBuilder.build(name='Line Number') +COLUMN_NUMBER_COLUMN = ColumnInfoBuilder.build(name='Column Number') +VIOLATION_LENGTH_COLUMN = ColumnInfoBuilder.build(name='Violation Length') +VIOLATION_COLUMN = ColumnInfoBuilder.build(name='Violation') + +COLUMN_WIDTHS_CONFIG: ColumnWidthsConfig = { + SECRET_SCAN_TYPE: { + ISSUE_TYPE_COLUMN: 2, + RULE_ID_COLUMN: 2, + FILE_PATH_COLUMN: 2, + SECRET_SHA_COLUMN: 2, + VIOLATION_COLUMN: 2, + }, + INFRA_CONFIGURATION_SCAN_TYPE: { + ISSUE_TYPE_COLUMN: 4, + RULE_ID_COLUMN: 3, + FILE_PATH_COLUMN: 3, + }, + SAST_SCAN_TYPE: { + ISSUE_TYPE_COLUMN: 7, + RULE_ID_COLUMN: 2, + FILE_PATH_COLUMN: 3, + }, +} class TablePrinter(BaseTablePrinter): def _print_results(self, results: List[DocumentDetections]) -> None: table = self._get_table() + table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) for result in results: for detection in result.detections: @@ -31,7 +53,7 @@ def _print_results(self, results: List[DocumentDetections]) -> None: click.echo(table.get_table().draw()) def _get_table(self) -> Table: - table = Table(self.scan_type) + table = Table() table.add(ISSUE_TYPE_COLUMN) table.add(RULE_ID_COLUMN) From 530689de5f66d63df4fa054b76afad05dca4d063 Mon Sep 17 00:00:00 2001 From: Ilya Siamionau Date: Tue, 13 Jun 2023 11:46:58 +0200 Subject: [PATCH 6/6] fix possible exception --- cycode/cli/printers/table_printer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cycode/cli/printers/table_printer.py b/cycode/cli/printers/table_printer.py index 801c9c09..353ce903 100644 --- a/cycode/cli/printers/table_printer.py +++ b/cycode/cli/printers/table_printer.py @@ -44,7 +44,8 @@ class TablePrinter(BaseTablePrinter): def _print_results(self, results: List[DocumentDetections]) -> None: table = self._get_table() - table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) + if self.scan_type in COLUMN_WIDTHS_CONFIG: + table.set_cols_width(COLUMN_WIDTHS_CONFIG[self.scan_type]) for result in results: for detection in result.detections: