diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index dad1275..75749ed 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -28,6 +28,8 @@ jobs: steps: - name: Checkout repository uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + submodules: true - name: Log in to the Container registry if: ${{ github.event_name != 'pull_request' }} diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 281924b..42ca1da 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -22,6 +22,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + with: + submodules: true - name: Set up Python uses: actions/setup-python@v5 with: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f6cf5ab..4b0ad96 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,6 +18,8 @@ jobs: steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + submodules: true - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0 @@ -53,6 +55,8 @@ jobs: steps: - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + submodules: true - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@8d9ed9ac5c53483de85588cdf95a591a75ab9f55 # v5.5.0 diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d1ca67e --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "src/semgrep_mcp/semgrep_interfaces"] + path = src/semgrep_mcp/semgrep_interfaces + url = https://github.com/semgrep/semgrep-interfaces.git diff --git a/pyproject.toml b/pyproject.toml index 0048bf4..ba51efd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,3 +100,8 @@ max-complexity = 10 packages = [ "src/semgrep_mcp", ] + +[tool.pyright] +exclude = [ + "src/semgrep_mcp/semgrep_interfaces" +] diff --git a/src/semgrep_mcp/semgrep.py b/src/semgrep_mcp/semgrep.py new file mode 100644 index 0000000..bfd8cc5 --- /dev/null +++ b/src/semgrep_mcp/semgrep.py @@ -0,0 +1,251 @@ +import asyncio +import json +import os +import subprocess +from typing import Any + +from mcp.shared.exceptions import McpError +from mcp.types import INTERNAL_ERROR, ErrorData + +from semgrep_mcp.models import CodeFile +from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput + +################################################################################ +# Prelude # +################################################################################ +# Communicating with the Semgrep binary. + +################################################################################ +# Constants # +################################################################################ + +_SEMGREP_LOCK = asyncio.Lock() + +# Global variable to store the semgrep executable path +SEMGREP_EXECUTABLE: str | None = None + +################################################################################ +# Finding Semgrep # +################################################################################ + + +# Semgrep utilities +def find_semgrep_path() -> str | None: + """ + Dynamically find semgrep in PATH or common installation directories + Returns: Path to semgrep executable or None if not found + """ + # Common paths where semgrep might be installed + common_paths = [ + "semgrep", # Default PATH + "/usr/local/bin/semgrep", + "/usr/bin/semgrep", + "/opt/homebrew/bin/semgrep", # Homebrew on macOS + "/opt/semgrep/bin/semgrep", + "/home/linuxbrew/.linuxbrew/bin/semgrep", # Homebrew on Linux + "/snap/bin/semgrep", # Snap on Linux + ] + + # Add Windows paths if on Windows + if os.name == "nt": + app_data = os.environ.get("APPDATA", "") + if app_data: + common_paths.extend( + [ + os.path.join(app_data, "Python", "Scripts", "semgrep.exe"), + os.path.join(app_data, "npm", "semgrep.cmd"), + ] + ) + + # Try each path + for semgrep_path in common_paths: + # For 'semgrep' (without path), check if it's in PATH + if semgrep_path == "semgrep": + try: + subprocess.run( + [semgrep_path, "--version"], check=True, capture_output=True, text=True + ) + return semgrep_path + except (subprocess.SubprocessError, FileNotFoundError): + continue + + # For absolute paths, check if the file exists before testing + if os.path.isabs(semgrep_path): + if not os.path.exists(semgrep_path): + continue + + # Try executing semgrep at this path + try: + subprocess.run( + [semgrep_path, "--version"], check=True, capture_output=True, text=True + ) + return semgrep_path + except (subprocess.SubprocessError, FileNotFoundError): + continue + + return None + + +async def ensure_semgrep_available() -> str: + """ + Ensures semgrep is available and sets the global path in a thread-safe manner + + Returns: + Path to semgrep executable + + Raises: + McpError: If semgrep is not installed or not found + """ + global SEMGREP_EXECUTABLE + + # Fast path - check if we already have the path + if SEMGREP_EXECUTABLE: + return SEMGREP_EXECUTABLE + + # Slow path - acquire lock and find semgrep + async with _SEMGREP_LOCK: + # Try to find semgrep + semgrep_path = find_semgrep_path() + + if not semgrep_path: + raise McpError( + ErrorData( + code=INTERNAL_ERROR, + message="Semgrep is not installed or not in your PATH. " + "Please install Semgrep manually before using this tool. " + "Installation options: " + "pip install semgrep, " + "macOS: brew install semgrep, " + "Or refer to https://semgrep.dev/docs/getting-started/", + ) + ) + + # Store the path for future use + SEMGREP_EXECUTABLE = semgrep_path + return semgrep_path + + +def set_semgrep_executable(semgrep_path: str) -> None: + global SEMGREP_EXECUTABLE + SEMGREP_EXECUTABLE = semgrep_path + + +################################################################################ +# Communicating with Semgrep over RPC # +################################################################################ + + +class SemgrepContext: + process: asyncio.subprocess.Process + stdin: asyncio.StreamWriter + stdout: asyncio.StreamReader + + def __init__(self, process: asyncio.subprocess.Process) -> None: + self.process = process + + if process.stdin is not None and process.stdout is not None: + self.stdin = process.stdin + self.stdout = process.stdout + else: + raise McpError( + ErrorData( + code=INTERNAL_ERROR, + message="Semgrep process stdin/stdout not available", + ) + ) + + async def communicate(self, line: str) -> str: + self.stdin.write(f"{line}\n".encode()) + await self.stdin.drain() + + stdout = await self.stdout.readline() + return stdout.decode() + + async def send_request(self, request: str, **kwargs: Any) -> str: + payload = {"method": request, **kwargs} + + return await self.communicate(json.dumps(payload)) + + +################################################################################ +# Running Semgrep # +################################################################################ + + +async def run_semgrep_process(args: list[str]) -> asyncio.subprocess.Process: + """ + Runs semgrep with the given arguments as a subprocess, without waiting for it to finish. + """ + + # Ensure semgrep is available + semgrep_path = await ensure_semgrep_available() + + # Just so we get the debug logs for the MCP server + env = os.environ.copy() + env["SEMGREP_LOG_SRCS"] = "mcp" + + # Execute semgrep command + process = await asyncio.create_subprocess_exec( + semgrep_path, + *args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + # This ensures that stderr makes it through to + # the server logs, for debugging purposes. + stderr=None, + env=env, + ) + + return process + + +async def run_semgrep(args: list[str]) -> str: + """ + Runs semgrep with the given arguments + + Args: + args: List of command arguments + + Returns: + Output of semgrep command + """ + + process = await run_semgrep_process(args) + + stdout, stderr = await process.communicate() + + if process.returncode != 0: + raise McpError( + ErrorData( + code=INTERNAL_ERROR, + message=f"Error running semgrep: ({process.returncode}) {stderr.decode()}", + ) + ) + + return stdout.decode() + + +async def run_semgrep_via_rpc(context: SemgrepContext, data: list[CodeFile]) -> CliOutput: + """ + Runs semgrep with the given arguments via RPC + + Args: + data: List of code files to scan + + Returns: + List of CliMatch objects + """ + + files_json = [{"file": data.filename, "content": data.content} for data in data] + + # ATD serialized value + resp = await context.send_request("scanFiles", files=files_json) + + # The JSON we get is double encoded, looks like + # '"{"results": ..., ...}"' + # so we have to load it twice + resp_json = json.loads(resp) + resp_json = json.loads(resp_json) + assert isinstance(resp_json, dict) + + return CliOutput.from_json(resp_json) diff --git a/src/semgrep_mcp/semgrep_interfaces b/src/semgrep_mcp/semgrep_interfaces new file mode 160000 index 0000000..3ce3539 --- /dev/null +++ b/src/semgrep_mcp/semgrep_interfaces @@ -0,0 +1 @@ +Subproject commit 3ce3539b8d9e46b98cf73d08f0b44ef63c3e80a0 diff --git a/src/semgrep_mcp/server.py b/src/semgrep_mcp/server.py index 3db83eb..58eb37c 100755 --- a/src/semgrep_mcp/server.py +++ b/src/semgrep_mcp/server.py @@ -1,15 +1,15 @@ #!/usr/bin/env python3 -import asyncio import os import shutil -import subprocess import tempfile +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from pathlib import Path from typing import Any import click import httpx -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import Context, FastMCP from mcp.shared.exceptions import McpError from mcp.types import ( INTERNAL_ERROR, @@ -21,6 +21,14 @@ from starlette.responses import JSONResponse from semgrep_mcp.models import CodeFile, Finding, LocalCodeFile, SemgrepScanResult +from semgrep_mcp.semgrep import ( + SemgrepContext, + run_semgrep, + run_semgrep_process, + run_semgrep_via_rpc, + set_semgrep_executable, +) +from semgrep_mcp.semgrep_interfaces.semgrep_output_v1 import CliOutput # --------------------------------------------------------------------------------- # Constants @@ -52,14 +60,9 @@ # Global Variables # --------------------------------------------------------------------------------- -# Global variable to store the semgrep executable path -semgrep_executable: str | None = None -_semgrep_lock = asyncio.Lock() - # Global variable to cache deployment slug DEPLOYMENT_SLUG: str | None = None - # --------------------------------------------------------------------------------- # Utilities # --------------------------------------------------------------------------------- @@ -131,102 +134,6 @@ def validate_config(config: str | None = None) -> str: return validate_absolute_path(config, "config") -# Semgrep utilities -def find_semgrep_path() -> str | None: - """ - Dynamically find semgrep in PATH or common installation directories - Returns: Path to semgrep executable or None if not found - """ - # Common paths where semgrep might be installed - common_paths = [ - "semgrep", # Default PATH - "/usr/local/bin/semgrep", - "/usr/bin/semgrep", - "/opt/homebrew/bin/semgrep", # Homebrew on macOS - "/opt/semgrep/bin/semgrep", - "/home/linuxbrew/.linuxbrew/bin/semgrep", # Homebrew on Linux - "/snap/bin/semgrep", # Snap on Linux - ] - - # Add Windows paths if on Windows - if os.name == "nt": - app_data = os.environ.get("APPDATA", "") - if app_data: - common_paths.extend( - [ - os.path.join(app_data, "Python", "Scripts", "semgrep.exe"), - os.path.join(app_data, "npm", "semgrep.cmd"), - ] - ) - - # Try each path - for semgrep_path in common_paths: - # For 'semgrep' (without path), check if it's in PATH - if semgrep_path == "semgrep": - try: - subprocess.run( - [semgrep_path, "--version"], check=True, capture_output=True, text=True - ) - return semgrep_path - except (subprocess.SubprocessError, FileNotFoundError): - continue - - # For absolute paths, check if the file exists before testing - if os.path.isabs(semgrep_path): - if not os.path.exists(semgrep_path): - continue - - # Try executing semgrep at this path - try: - subprocess.run( - [semgrep_path, "--version"], check=True, capture_output=True, text=True - ) - return semgrep_path - except (subprocess.SubprocessError, FileNotFoundError): - continue - - return None - - -async def ensure_semgrep_available() -> str: - """ - Ensures semgrep is available and sets the global path in a thread-safe manner - - Returns: - Path to semgrep executable - - Raises: - McpError: If semgrep is not installed or not found - """ - global semgrep_executable - - # Fast path - check if we already have the path - if semgrep_executable: - return semgrep_executable - - # Slow path - acquire lock and find semgrep - async with _semgrep_lock: - # Try to find semgrep - semgrep_path = find_semgrep_path() - - if not semgrep_path: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message="Semgrep is not installed or not in your PATH. " - "Please install Semgrep manually before using this tool. " - "Installation options: " - "pip install semgrep, " - "macOS: brew install semgrep, " - "Or refer to https://semgrep.dev/docs/getting-started/", - ) - ) - - # Store the path for future use - semgrep_executable = semgrep_path - return semgrep_path - - # Utility functions for handling code content def create_temp_files_from_code_content(code_files: list[CodeFile]) -> str: """ @@ -334,38 +241,6 @@ def validate_code_files(code_files: list[CodeFile]) -> None: ) -async def run_semgrep(args: list[str]) -> str: - """ - Runs semgrep with the given arguments - - Args: - args: List of command arguments - - Returns: - Output of semgrep command - """ - - # Ensure semgrep is available - semgrep_path = await ensure_semgrep_available() - - # Execute semgrep command - process = await asyncio.create_subprocess_exec( - semgrep_path, *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) - - stdout, stderr = await process.communicate() - - if process.returncode != 0: - raise McpError( - ErrorData( - code=INTERNAL_ERROR, - message=f"Error running semgrep: ({process.returncode}) {stderr.decode()}", - ) - ) - - return stdout.decode() - - def remove_temp_dir_from_results(results: SemgrepScanResult, temp_dir: str) -> None: """ Clean the results from semgrep by converting temporary file paths back to @@ -400,13 +275,30 @@ def remove_temp_dir_from_results(results: SemgrepScanResult, temp_dir: str) -> N # MCP Server # --------------------------------------------------------------------------------- + +@asynccontextmanager +async def server_lifespan(_server: FastMCP) -> AsyncIterator[SemgrepContext]: + """Manage server startup and shutdown lifecycle.""" + # Initialize resources on startup + # MCP requires Pro Engine + process = await run_semgrep_process(["mcp", "--pro"]) + + try: + yield SemgrepContext(process=process) + finally: + if process.returncode is None: + # Clean up on shutdown + process.terminate() + else: + print(f"`semgrep mcp` process exited with code {process.returncode} already") + + # Create a fast MCP server mcp = FastMCP( "Semgrep", - version=__version__, - request_timeout=DEFAULT_TIMEOUT, stateless_http=True, json_response=True, + lifespan=server_lifespan, ) http_client = httpx.AsyncClient() @@ -756,6 +648,50 @@ async def semgrep_scan( shutil.rmtree(temp_dir, ignore_errors=True) +@mcp.tool() +async def semgrep_scan_rpc( + ctx: Context, + code_files: list[CodeFile] = CODE_FILES_FIELD, +) -> CliOutput: + """ + Runs a Semgrep scan on provided code content using the new Semgrep RPC feature. + + This should run much faster than the comparative `semgrep_scan` tool. + + Use this tool when you need to: + - scan code files for security vulnerabilities + - scan code files for other issues + - scan quickly + """ + + # Validate code_files + # TODO: could this be slow if content is big? + validate_code_files(code_files) + + context: SemgrepContext = ctx.request_context.lifespan_context + + temp_dir = None + try: + # TODO: perhaps should return more interpretable results? + cli_output = await run_semgrep_via_rpc(context, code_files) + return cli_output + except McpError as e: + raise e + except ValidationError as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error parsing semgrep output: {e!s}") + ) from e + except Exception as e: + raise McpError( + ErrorData(code=INTERNAL_ERROR, message=f"Error running semgrep scan: {e!s}") + ) from e + + finally: + if temp_dir: + # Clean up temporary files + shutil.rmtree(temp_dir, ignore_errors=True) + + @mcp.tool() async def semgrep_scan_local( code_files: list[LocalCodeFile] = LOCAL_CODE_FILES_FIELD, @@ -1072,13 +1008,25 @@ async def health(request: Request) -> JSONResponse: envvar="MCP_TRANSPORT", help="Transport protocol to use: stdio, streamable-http, or sse (legacy)", ) -def main(transport: str) -> None: +@click.option( + "--semgrep-path", + type=click.Path(exists=True), + default=None, + envvar="SEMGREP_PATH", + help="Path to the Semgrep binary", +) +def main(transport: str, semgrep_path: str | None) -> None: """Entry point for the MCP server Supports stdio, streamable-http, and sse transports. For stdio, it will read from stdin and write to stdout. For streamable-http and sse, it will start an HTTP server on port 8000. """ + + # Set the executable path in case it's manually specified. + if semgrep_path: + set_semgrep_executable(semgrep_path) + if transport == "stdio": mcp.run(transport="stdio") elif transport == "streamable-http": diff --git a/uv.lock b/uv.lock index daabfba..3916c3f 100644 --- a/uv.lock +++ b/uv.lock @@ -386,7 +386,7 @@ wheels = [ [[package]] name = "mcp" -version = "1.12.1" +version = "1.12.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -401,9 +401,9 @@ dependencies = [ { name = "starlette" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/5c/5a/16cef13b2e60d5f865fbc96372efb23dc8b0591f102dd55003b4ae62f9b1/mcp-1.12.1.tar.gz", hash = "sha256:d1d0bdeb09e4b17c1a72b356248bf3baf75ab10db7008ef865c4afbeb0eb810e", size = 425768, upload-time = "2025-07-22T16:51:41.66Z" } +sdist = { url = "https://files.pythonhosted.org/packages/66/85/f36d538b1286b7758f35c1b69d93f2719d2df90c01bd074eadd35f6afc35/mcp-1.12.2.tar.gz", hash = "sha256:a4b7c742c50ce6ed6d6a6c096cca0e3893f5aecc89a59ed06d47c4e6ba41edcc", size = 426202, upload-time = "2025-07-24T18:29:05.175Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b9/04/9a967a575518fc958bda1e34a52eae0c7f6accf3534811914fdaf57b0689/mcp-1.12.1-py3-none-any.whl", hash = "sha256:34147f62891417f8b000c39718add844182ba424c8eb2cea250b4267bda4b08b", size = 158463, upload-time = "2025-07-22T16:51:40.086Z" }, + { url = "https://files.pythonhosted.org/packages/2f/cf/3fd38cfe43962452e4bfadc6966b2ea0afaf8e0286cb3991c247c8c33ebd/mcp-1.12.2-py3-none-any.whl", hash = "sha256:b86d584bb60193a42bd78aef01882c5c42d614e416cbf0480149839377ab5a5f", size = 158473, upload-time = "2025-07-24T18:29:03.419Z" }, ] [package.optional-dependencies]