diff --git a/tests/integration/source_code/solacc-malformed.txt b/tests/integration/source_code/solacc-malformed.txt new file mode 100644 index 0000000000..0ae0e976a8 --- /dev/null +++ b/tests/integration/source_code/solacc-malformed.txt @@ -0,0 +1,21 @@ +# 3 failures are genuinely malformed code +# the following fails because of incorrect predicate syntax: if start_over = "yes": +user-behavior-analytics-for-cloud-services/Anomaly Detection Pipeline/0 - Ingest and Explore Raw User Data.py +# the following fails to parse due to module dependency azure/devops/v5_1/client_factory.py +edge-ml-for-manufacturing/03_Trigger_AzureDevOps_Job.py +# the following fails to parse due to incomplete assignment 'fhir_file_location=' +hls-interop-workshop-jan23/fhir.py + +# 6 failures fail because of lack of automagic support +# the following fails because of unsupported automagic line: pip install git+https://github.com/databricks-industry-solutions/dbignite.git@feature-FHIR-schema-dbignite-HLSSA-294 +dbignite-forked/notebooks/fhir-mapping-demo.py +# the following fails because of unsupported automagic line: sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64/3bf863cc.pub +hls-llm-doc-qa/util/install-llm-libraries.py +# the following fails because of unsupported automagic line: pip install --upgrade google-auth gcsfs +security-analysis-tool/notebooks/Setup/3. test_connections.py +# the following fails because of unsupported automagic line: pip install --upgrade google-auth gcsfs +security-analysis-tool/notebooks/Setup/gcp/configure_sa_auth_tokens.py +# the following fails because of unsupported automagic line: pip install --upgrade google-auth gcsfs +security-analysis-tool/notebooks/Setup/gcp/configure_tokens_for_worksaces.py +# the following fails because of unsupported automagic line: pip install --upgrade google-auth gcsfs +security-analysis-tool/notebooks/diagnosis/sat_diagnosis_gcp.py diff --git a/tests/integration/source_code/solacc.py b/tests/integration/source_code/solacc.py index 98ca0d33a9..363d5ccae0 100644 --- a/tests/integration/source_code/solacc.py +++ b/tests/integration/source_code/solacc.py @@ -1,4 +1,5 @@ import logging +import os import sys from pathlib import Path @@ -44,36 +45,78 @@ def clone_all(): run_command(f'git clone {url} {dst}') -def lint_all(): - # pylint: disable=too-many-nested-blocks +def lint_one(file: Path, ctx: LocalCheckoutContext, unparsed: Path | None) -> tuple[set[str], int]: + try: + missing_imports: set[str] = set() + for located_advice in ctx.local_code_linter.lint_path(file): + if located_advice.advice.code == 'import-not-found': + missing_imports.add(located_advice.advice.message.split(':')[1].strip()) + message = located_advice.message_relative_to(dist.parent, default=file) + sys.stdout.write(f"{message}\n") + return missing_imports, 1 + except Exception as e: # pylint: disable=broad-except + # here we're most likely catching astroid & sqlglot errors + if unparsed is None: # linting single file, log exception details + logger.error(f"Error during parsing of {file}: {e}".replace("\n", " "), exc_info=e) + else: + logger.error(f"Error during parsing of {file}: {e}".replace("\n", " ")) + # populate solacc-unparsed.txt + with unparsed.open(mode="a", encoding="utf-8") as f: + f.write(file.relative_to(dist).as_posix()) + f.write("\n") + return set(), 0 + + +def lint_all(file_to_lint: str | None): ws = WorkspaceClient(host='...', token='...') ctx = LocalCheckoutContext(ws).replace( linter_context_factory=lambda session_state: LinterContext(MigrationIndex([]), session_state) ) parseable = 0 - missing_imports = 0 - all_files = list(dist.glob('**/*.py')) + missing_imports: dict[str, int] = {} + all_files = list(dist.glob('**/*.py')) if file_to_lint is None else [Path(dist, file_to_lint)] + unparsed: Path | None = None + if file_to_lint is None: + unparsed = Path(Path(__file__).parent, "solacc-unparsed.txt") + if unparsed.exists(): + os.remove(unparsed) + skipped: set[str] | None = None + malformed = Path(__file__).parent / "solacc-malformed.txt" + if file_to_lint is None and malformed.exists(): + lines = malformed.read_text(encoding="utf-8").split("\n") + skipped = set(line for line in lines if len(line) > 0 and not line.startswith("#")) for file in all_files: - try: - for located_advice in ctx.local_code_linter.lint_path(file): - if located_advice.advice.code == 'import-not-found': - missing_imports += 1 - message = located_advice.message_relative_to(dist.parent, default=file) - sys.stdout.write(f"{message}\n") - parseable += 1 - except Exception as e: # pylint: disable=broad-except - # here we're most likely catching astroid & sqlglot errors - logger.error(f"Error during parsing of {file}: {e}".replace("\n", " "), exc_info=e) - parseable_pct = int(parseable / len(all_files) * 100) - logger.info(f"Parseable: {parseable_pct}% ({parseable}/{len(all_files)}), missing imports: {missing_imports}") + if skipped and file.relative_to(dist).as_posix() in skipped: + continue + _missing_imports, _parseable = lint_one(file, ctx, unparsed) + for _import in _missing_imports: + count = missing_imports.get(_import, 0) + missing_imports[_import] = count + 1 + parseable += _parseable + all_files_len = len(all_files) - (len(skipped) if skipped else 0) + parseable_pct = int(parseable / all_files_len * 100) + logger.info( + f"Skipped: {len(skipped or [])}, parseable: {parseable_pct}% ({parseable}/{all_files_len}), missing imports: {sum(missing_imports.values())}" + ) + missing_imports = dict(sorted(missing_imports.items(), key=lambda item: item[1], reverse=True)) + for key, value in missing_imports.items(): + logger.info(f"Missing import '{key}': {value} occurrences") + # fail the job if files are unparseable if parseable_pct < 100: sys.exit(1) -if __name__ == "__main__": +def main(args: list[str]): install_logger() logging.root.setLevel(logging.INFO) - logger.info("Cloning...") - clone_all() + file_to_lint = args[1] if len(args) > 1 else None + if not file_to_lint: + # don't clone if linting just one file, assumption is we're troubleshooting + logger.info("Cloning...") + clone_all() logger.info("Linting...") - lint_all() + lint_all(file_to_lint) + + +if __name__ == "__main__": + main(sys.argv)