diff --git a/CLAUDE.md b/CLAUDE.md index bf36e2414b..a0c028d07c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -54,7 +54,7 @@ src/ai_company/ providers/ # LLM provider abstraction (LiteLLM adapter) security/ # SecOps agent, approval gates, audit templates/ # Pre-built company templates and builder - tools/ # Tool registry, MCP integration, role-based access, sandboxing + tools/ # Tool registry, built-in git tools, MCP integration, role-based access, sandboxing ``` ## Shell Usage diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 7bdfa34e85..23e46618be 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -1649,6 +1649,8 @@ When the LLM requests multiple tool calls in a single turn, `ToolInvoker.invoke_ The `ToolPermissionChecker` resolves permissions using a priority-based system: denied list (highest) → allowed list → access-level categories → deny (default). `AgentEngine._make_tool_invoker()` creates a permission-aware invoker from the agent's `ToolPermissions` at the start of each `run()` call. Note: M3 implements category-level gating only; the granular sub-constraints described in §11.2 (workspace scope, network mode) are planned for when sandboxing is implemented. +> **M3 implementation note — Built-in git tools:** Six workspace-scoped git tools are implemented in `tools/git_tools.py` with a shared `_BaseGitTool` base class in `tools/_git_base.py`: `GitStatusTool`, `GitLogTool`, `GitDiffTool`, `GitBranchTool`, `GitCommitTool`, and `GitCloneTool`. The base class enforces workspace boundary security (path traversal prevention via `resolve()` + `relative_to()`) and provides a common `_run_git()` helper using `asyncio.create_subprocess_exec` (never `shell=True`). Security hardening includes: `GIT_TERMINAL_PROMPT=0` to prevent credential prompts, `GIT_CONFIG_NOSYSTEM=1`, `GIT_CONFIG_GLOBAL=/dev/null`, and `GIT_PROTOCOL_FROM_USER=0` to restrict config/protocol attack surfaces, rejection of flag-like ref/branch values (starting with `-`), URL scheme validation on clone (only `https://`, `http://`, `ssh://`, `git://`, and SCP-like syntax) with `--` separator before positional URL argument, and clone URLs starting with `-` are rejected. All tools return `ToolExecutionResult` for errors rather than raising exceptions. **Future:** Consider adding host/IP allowlisting for clone URLs to prevent SSRF against internal networks (loopback, link-local, private ranges). + ### 11.1.2 Tool Sandboxing Tool execution requires safety boundaries proportional to the risk of each tool category. The framework uses a **layered sandboxing strategy** with a pluggable `SandboxBackend` protocol — new backends can be added without modifying existing ones. The default configuration uses lighter isolation for low-risk tools and stronger isolation for high-risk tools. @@ -2299,6 +2301,7 @@ ai-company/ │ │ │ ├── budget.py # BUDGET_* constants │ │ │ ├── config.py # CONFIG_* constants │ │ │ ├── execution.py # EXECUTION_* constants +│ │ │ ├── git.py # GIT_* constants │ │ │ ├── prompt.py # PROMPT_* constants │ │ │ ├── provider.py # PROVIDER_* constants │ │ │ ├── role.py # ROLE_* constants @@ -2345,7 +2348,8 @@ ai-company/ │ │ │ ├── subprocess.py # SubprocessSandbox (default for low-risk) │ │ │ └── docker.py # DockerSandbox (for code_runner, terminal) │ │ ├── file_system.py # File operations (M3) -│ │ ├── git_tools.py # Git operations (M3) +│ │ ├── _git_base.py # Base class for git tools (workspace, subprocess) +│ │ ├── git_tools.py # Git operations — 6 built-in tools │ │ ├── code_runner.py # Code execution (M3) │ │ ├── web_tools.py # HTTP, search (M3) │ │ └── mcp_bridge.py # MCP server integration (M7) diff --git a/src/ai_company/observability/events/git.py b/src/ai_company/observability/events/git.py new file mode 100644 index 0000000000..f640f16d2a --- /dev/null +++ b/src/ai_company/observability/events/git.py @@ -0,0 +1,11 @@ +"""Git tool event constants.""" + +from typing import Final + +GIT_COMMAND_START: Final[str] = "git.command.start" +GIT_COMMAND_SUCCESS: Final[str] = "git.command.success" +GIT_COMMAND_FAILED: Final[str] = "git.command.failed" +GIT_COMMAND_TIMEOUT: Final[str] = "git.command.timeout" +GIT_WORKSPACE_VIOLATION: Final[str] = "git.workspace.violation" +GIT_CLONE_URL_REJECTED: Final[str] = "git.clone.url_rejected" +GIT_REF_INJECTION_BLOCKED: Final[str] = "git.ref.injection_blocked" diff --git a/src/ai_company/tools/__init__.py b/src/ai_company/tools/__init__.py index 680bdcc561..33b1bd4f9f 100644 --- a/src/ai_company/tools/__init__.py +++ b/src/ai_company/tools/__init__.py @@ -9,6 +9,14 @@ ToolPermissionDeniedError, ) from .examples.echo import EchoTool +from .git_tools import ( + GitBranchTool, + GitCloneTool, + GitCommitTool, + GitDiffTool, + GitLogTool, + GitStatusTool, +) from .invoker import ToolInvoker from .permissions import ToolPermissionChecker from .registry import ToolRegistry @@ -16,6 +24,12 @@ __all__ = [ "BaseTool", "EchoTool", + "GitBranchTool", + "GitCloneTool", + "GitCommitTool", + "GitDiffTool", + "GitLogTool", + "GitStatusTool", "ToolError", "ToolExecutionError", "ToolExecutionResult", diff --git a/src/ai_company/tools/_git_base.py b/src/ai_company/tools/_git_base.py new file mode 100644 index 0000000000..fc72576342 --- /dev/null +++ b/src/ai_company/tools/_git_base.py @@ -0,0 +1,266 @@ +"""Base class for workspace-scoped git tools. + +Provides ``_BaseGitTool`` with helper methods for running git +subprocesses, validating relative paths against the workspace +boundary, and rejecting flag-injection attempts. Subprocess +execution uses ``asyncio.create_subprocess_exec`` (never +``shell=True``) with ``GIT_TERMINAL_PROMPT=0``, +``GIT_CONFIG_NOSYSTEM=1``, ``GIT_CONFIG_GLOBAL`` pointed to +``/dev/null``, and ``GIT_PROTOCOL_FROM_USER=0`` to prevent +interactive prompts and restrict config/protocol attack surfaces. +""" + +import asyncio +import os +from abc import ABC +from pathlib import Path # noqa: TC003 — used at runtime +from typing import Any, Final + +from ai_company.core.enums import ToolCategory +from ai_company.observability import get_logger +from ai_company.observability.events.git import ( + GIT_COMMAND_FAILED, + GIT_COMMAND_START, + GIT_COMMAND_SUCCESS, + GIT_COMMAND_TIMEOUT, + GIT_REF_INJECTION_BLOCKED, + GIT_WORKSPACE_VIOLATION, +) +from ai_company.tools.base import BaseTool, ToolExecutionResult + +logger = get_logger(__name__) + +_DEFAULT_TIMEOUT: Final[float] = 30.0 + + +class _BaseGitTool(BaseTool, ABC): + """Shared base for all git tools. + + Holds the ``workspace`` path and provides helper methods for running + git commands and validating relative paths against the workspace + boundary. + + Attributes: + workspace: Absolute path to the agent's workspace directory. + """ + + def __init__( + self, + *, + name: str, + description: str, + parameters_schema: dict[str, Any], + workspace: Path, + ) -> None: + """Initialize a git tool bound to a workspace. + + Args: + name: Tool name. + description: Human-readable description. + parameters_schema: JSON Schema for tool parameters. + workspace: Absolute path to the workspace root. + + Raises: + ValueError: If *workspace* is not an absolute path. + """ + if not workspace.is_absolute(): + msg = f"workspace must be an absolute path, got: {workspace}" + raise ValueError(msg) + super().__init__( + name=name, + description=description, + parameters_schema=parameters_schema, + category=ToolCategory.VERSION_CONTROL, + ) + self._workspace = workspace.resolve() + + @property + def workspace(self) -> Path: + """Workspace root directory.""" + return self._workspace + + def _validate_path(self, relative: str) -> Path: + """Resolve a relative path and verify it stays within workspace. + + Args: + relative: A relative path string from the LLM. + + Returns: + The resolved absolute ``Path``. + + Raises: + ValueError: If the path escapes the workspace boundary or + cannot be resolved. + """ + try: + resolved = (self._workspace / relative).resolve() + except OSError: + logger.warning( + GIT_WORKSPACE_VIOLATION, + path=relative, + workspace=str(self._workspace), + error="path resolution failed", + ) + msg = f"Path '{relative}' could not be resolved" + raise ValueError(msg) from None + try: + resolved.relative_to(self._workspace) + except ValueError: + logger.warning( + GIT_WORKSPACE_VIOLATION, + path=relative, + workspace=str(self._workspace), + ) + msg = f"Path '{relative}' is outside workspace" + raise ValueError(msg) from None + return resolved + + def _check_paths(self, paths: list[str]) -> ToolExecutionResult | None: + """Validate a list of paths, returning an error result or None. + + Args: + paths: Relative path strings to validate. + + Returns: + A ``ToolExecutionResult`` with ``is_error=True`` if any path + escapes the workspace, or ``None`` if all paths are valid. + """ + for p in paths: + try: + self._validate_path(p) + except ValueError as exc: + return ToolExecutionResult( + content=str(exc), + is_error=True, + ) + return None + + def _check_ref( + self, + value: str, + *, + param: str, + ) -> ToolExecutionResult | None: + """Reject values starting with ``-`` to prevent flag injection. + + Args: + value: The ref or branch name string to validate. + param: Parameter name for the error message. + + Returns: + A ``ToolExecutionResult`` with ``is_error=True`` if the value + starts with ``-``, or ``None`` if valid. + """ + if value.startswith("-"): + logger.warning( + GIT_REF_INJECTION_BLOCKED, + param=param, + value=value, + ) + return ToolExecutionResult( + content=f"Invalid {param}: must not start with '-'", + is_error=True, + ) + return None + + async def _run_git( + self, + args: list[str], + *, + cwd: Path | None = None, + deadline: float = _DEFAULT_TIMEOUT, + ) -> ToolExecutionResult: + """Run a git subprocess and return the result. + + Args: + args: Arguments to pass after ``git``. + cwd: Working directory (defaults to workspace). + deadline: Seconds before the process is killed. + + Returns: + A ``ToolExecutionResult`` with stdout as content on success, + or stderr/error message with ``is_error=True`` on failure. + """ + work_dir = cwd or self._workspace + env = { + **os.environ, + "GIT_TERMINAL_PROMPT": "0", + "GIT_CONFIG_NOSYSTEM": "1", + "GIT_CONFIG_GLOBAL": os.devnull, + "GIT_PROTOCOL_FROM_USER": "0", + } + + logger.debug( + GIT_COMMAND_START, + command=["git", *args], + cwd=str(work_dir), + ) + + try: + proc = await asyncio.create_subprocess_exec( + "git", + *args, + cwd=work_dir, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + except OSError: + logger.warning( + GIT_COMMAND_FAILED, + command=["git", *args], + error="subprocess start failed", + exc_info=True, + ) + return ToolExecutionResult( + content="Failed to start git process", + is_error=True, + ) + + try: + stdout_bytes, stderr_bytes = await asyncio.wait_for( + proc.communicate(), + timeout=deadline, + ) + except TimeoutError: + proc.kill() + try: + await asyncio.wait_for(proc.communicate(), timeout=5.0) + except TimeoutError: + logger.warning( + GIT_COMMAND_FAILED, + command=["git", *args], + error="process did not terminate after kill", + ) + logger.warning( + GIT_COMMAND_TIMEOUT, + command=["git", *args], + deadline=deadline, + ) + return ToolExecutionResult( + content=f"Git command timed out after {deadline}s", + is_error=True, + ) + + stdout = stdout_bytes.decode("utf-8", errors="replace").strip() + stderr = stderr_bytes.decode("utf-8", errors="replace").strip() + + if proc.returncode != 0: + logger.warning( + GIT_COMMAND_FAILED, + command=["git", *args], + returncode=proc.returncode, + stderr=stderr, + stdout=stdout, + ) + error_output = stderr or stdout or "Unknown git error" + return ToolExecutionResult( + content=error_output, + is_error=True, + ) + + logger.debug( + GIT_COMMAND_SUCCESS, + command=["git", *args], + ) + return ToolExecutionResult(content=stdout) diff --git a/src/ai_company/tools/git_tools.py b/src/ai_company/tools/git_tools.py new file mode 100644 index 0000000000..d35f4a7d1d --- /dev/null +++ b/src/ai_company/tools/git_tools.py @@ -0,0 +1,640 @@ +"""Built-in git tools for version control operations. + +Provides workspace-scoped git tools that agents use to interact with +git repositories. All tools enforce workspace boundary security — the +LLM never controls absolute paths. See ``_git_base._BaseGitTool`` for +the subprocess execution model, environment hardening, and path +validation shared by all tools. +""" + +from pathlib import Path # noqa: TC003 — used at runtime +from typing import Any, Final + +from ai_company.observability import get_logger +from ai_company.observability.events.git import ( + GIT_CLONE_URL_REJECTED, + GIT_COMMAND_START, +) +from ai_company.tools._git_base import _BaseGitTool +from ai_company.tools.base import ToolExecutionResult + +logger = get_logger(__name__) + +_CLONE_TIMEOUT: Final[float] = 120.0 +_ALLOWED_CLONE_SCHEMES: Final[tuple[str, ...]] = ( + "https://", + "http://", + "ssh://", + "git://", +) + + +def _is_allowed_clone_url(url: str) -> bool: + """Check if a clone URL uses an allowed remote scheme. + + Allows standard remote schemes and SCP-like syntax. Rejects + ``file://``, ``ext::``, bare local paths, and URLs starting with + ``-`` (flag injection). + + Args: + url: Repository URL string to validate. + + Returns: + ``True`` if the URL scheme is allowed. + """ + if url.startswith("-"): + return False + if any(url.startswith(scheme) for scheme in _ALLOWED_CLONE_SCHEMES): + return True + # SCP-like syntax: user@host:path (e.g. git@github.com:user/repo.git). + # Must have @ and : but NOT :: (rejects ext:: protocol) and NOT :// + # (rejects URLs that should match a scheme above). + return "@" in url and ":" in url and "::" not in url and "://" not in url + + +# ── GitStatusTool ───────────────────────────────────────────────── + + +class GitStatusTool(_BaseGitTool): + """Show the working tree status of the git repository. + + Returns the output of ``git status`` with optional short or + porcelain formatting. + """ + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_status tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_status", + description=( + "Show the working tree status. Returns modified, staged, " + "and untracked files in the workspace repository." + ), + parameters_schema={ + "type": "object", + "properties": { + "short": { + "type": "boolean", + "description": "Use short format output.", + "default": False, + }, + "porcelain": { + "type": "boolean", + "description": ("Use machine-readable porcelain format."), + "default": False, + }, + }, + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Run ``git status``. + + Args: + arguments: Optional ``short`` and ``porcelain`` flags. + + Returns: + A ``ToolExecutionResult`` with the status output. + """ + args = ["status"] + if arguments.get("porcelain"): + args.append("--porcelain") + elif arguments.get("short"): + args.append("--short") + return await self._run_git(args) + + +# ── GitLogTool ──────────────────────────────────────────────────── + + +class GitLogTool(_BaseGitTool): + """Show commit log history. + + Returns recent commits with optional filtering by count, author, + date range, ref, and paths. + """ + + _MAX_COUNT_LIMIT: int = 100 + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_log tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_log", + description=( + "Show commit log. Returns recent commits with optional " + "filtering by count, author, date range, ref, and paths." + ), + parameters_schema={ + "type": "object", + "properties": { + "max_count": { + "type": "integer", + "description": ("Max commits (default 10, max 100)."), + "default": 10, + "minimum": 1, + "maximum": 100, + }, + "oneline": { + "type": "boolean", + "description": "Use one-line format.", + "default": False, + }, + "ref": { + "type": "string", + "description": ("Branch, tag, or commit ref to start from."), + }, + "author": { + "type": "string", + "description": ("Filter commits by author pattern."), + }, + "since": { + "type": "string", + "description": ("Show commits after date (e.g. '2024-01-01')."), + }, + "until": { + "type": "string", + "description": "Show commits before this date.", + }, + "paths": { + "type": "array", + "items": {"type": "string"}, + "description": ("Limit to commits touching these paths."), + }, + }, + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Run ``git log``. + + Args: + arguments: Log options (max_count, oneline, ref, author, + since, until, paths). + + Returns: + A ``ToolExecutionResult`` with the log output. + """ + max_count = min( + arguments.get("max_count", 10), + self._MAX_COUNT_LIMIT, + ) + args = ["log", f"--max-count={max_count}"] + + if arguments.get("oneline"): + args.append("--oneline") + + if author := arguments.get("author"): + args.append(f"--author={author}") + + if since := arguments.get("since"): + args.append(f"--since={since}") + + if until := arguments.get("until"): + args.append(f"--until={until}") + + if ref := arguments.get("ref"): + if err := self._check_ref(ref, param="ref"): + return err + args.append(ref) + + paths: list[str] = arguments.get("paths", []) + if paths: + if err := self._check_paths(paths): + return err + args.append("--") + args.extend(paths) + + result = await self._run_git(args) + if not result.is_error and not result.content: + return ToolExecutionResult(content="No commits found") + return result + + +# ── GitDiffTool ─────────────────────────────────────────────────── + + +class GitDiffTool(_BaseGitTool): + """Show changes between commits, the index, and the working tree. + + Returns the output of ``git diff`` with optional ref comparison, + staged changes view, stat summary, and path filtering. + """ + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_diff tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_diff", + description=( + "Show changes between commits, index, and working tree. " + "Supports staged changes, ref comparison, and path " + "filtering." + ), + parameters_schema={ + "type": "object", + "properties": { + "staged": { + "type": "boolean", + "description": "Show staged (cached) changes.", + "default": False, + }, + "ref1": { + "type": "string", + "description": "First ref for comparison.", + }, + "ref2": { + "type": "string", + "description": "Second ref for comparison.", + }, + "stat": { + "type": "boolean", + "description": ("Show diffstat summary instead of full diff."), + "default": False, + }, + "paths": { + "type": "array", + "items": {"type": "string"}, + "description": "Limit diff to these paths.", + }, + }, + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Run ``git diff``. + + Args: + arguments: Diff options (staged, ref1, ref2, stat, paths). + + Returns: + A ``ToolExecutionResult`` with the diff output. Empty diff + returns "No changes" (not an error). + """ + args = ["diff"] + + if arguments.get("staged"): + args.append("--cached") + + if arguments.get("stat"): + args.append("--stat") + + if ref1 := arguments.get("ref1"): + if err := self._check_ref(ref1, param="ref1"): + return err + args.append(ref1) + if ref2 := arguments.get("ref2"): + if err := self._check_ref(ref2, param="ref2"): + return err + args.append(ref2) + + paths: list[str] = arguments.get("paths", []) + if paths: + if err := self._check_paths(paths): + return err + args.append("--") + args.extend(paths) + + result = await self._run_git(args) + if not result.is_error and not result.content: + return ToolExecutionResult(content="No changes") + return result + + +# ── GitBranchTool ───────────────────────────────────────────────── + + +class GitBranchTool(_BaseGitTool): + """Manage branches — list, create, switch, or delete. + + Supports listing all branches, creating new branches (optionally + from a start point), switching between branches, and deleting + branches. + """ + + _ACTIONS_REQUIRING_NAME = frozenset({"create", "switch", "delete"}) + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_branch tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_branch", + description=( + "Manage branches: list, create, switch, or delete. " + "Provide an action and branch name as needed." + ), + parameters_schema={ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": [ + "list", + "create", + "switch", + "delete", + ], + "description": "Branch action to perform.", + "default": "list", + }, + "name": { + "type": "string", + "description": ( + "Branch name (required for create/switch/delete)." + ), + }, + "start_point": { + "type": "string", + "description": ("Starting ref for branch creation."), + }, + "force": { + "type": "boolean", + "description": ( + "Force delete (-D) instead of safe delete (-d)." + ), + "default": False, + }, + }, + "required": ["action"], + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def _list_branches(self) -> ToolExecutionResult: + """List all branches.""" + result = await self._run_git(["branch", "-a"]) + if not result.is_error and not result.content: + return ToolExecutionResult(content="No branches found") + return result + + async def _create_branch( + self, + name: str, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Create a branch, optionally from a start point.""" + args = ["branch", name] + if start_point := arguments.get("start_point"): + if err := self._check_ref(start_point, param="start_point"): + return err + args.append(start_point) + return await self._run_git(args) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Run a branch operation. + + Args: + arguments: Branch action, name, start_point, force. + + Returns: + A ``ToolExecutionResult`` with the operation output. + """ + action: str = arguments.get("action", "list") + name: str | None = arguments.get("name") + + if action in self._ACTIONS_REQUIRING_NAME and not name: + return ToolExecutionResult( + content=(f"Branch name is required for '{action}' action"), + is_error=True, + ) + + if action == "list": + return await self._list_branches() + + # Narrowing: guaranteed non-None by guard above. + branch_name: str = name # type: ignore[assignment] + + if err := self._check_ref(branch_name, param="name"): + return err + + if action == "create": + return await self._create_branch(branch_name, arguments) + + if action == "switch": + return await self._run_git(["switch", branch_name]) + + # action == "delete" (per schema enum constraint) + flag = "-D" if arguments.get("force") else "-d" + return await self._run_git(["branch", flag, branch_name]) + + +# ── GitCommitTool ───────────────────────────────────────────────── + + +class GitCommitTool(_BaseGitTool): + """Stage and commit changes. + + Stages specified paths (or all changes with ``all=True``), then + creates a commit with the provided message. + """ + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_commit tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_commit", + description=( + "Stage and commit changes. Provide a commit message and " + "optionally specify paths to stage or use 'all' to stage " + "everything." + ), + parameters_schema={ + "type": "object", + "properties": { + "message": { + "type": "string", + "description": "Commit message.", + }, + "paths": { + "type": "array", + "items": {"type": "string"}, + "description": ("Paths to stage before committing."), + }, + "all": { + "type": "boolean", + "description": ("Stage all modified and deleted files."), + "default": False, + }, + }, + "required": ["message"], + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Stage and commit changes. + + Args: + arguments: Commit message, optional paths, optional all flag. + + Returns: + A ``ToolExecutionResult`` with the commit output. + """ + message: str = arguments["message"] + paths: list[str] = arguments.get("paths", []) + stage_all: bool = arguments.get("all", False) + + if paths: + if err := self._check_paths(paths): + return err + add_result = await self._run_git(["add", "--", *paths]) + if add_result.is_error: + return add_result + elif stage_all: + add_result = await self._run_git(["add", "-A"]) + if add_result.is_error: + return add_result + else: + logger.debug( + GIT_COMMAND_START, + command=["git", "commit"], + note="no staging requested; committing already staged", + ) + + return await self._run_git(["commit", "-m", message]) + + +# ── GitCloneTool ────────────────────────────────────────────────── + + +class GitCloneTool(_BaseGitTool): + """Clone a git repository into the workspace. + + Validates that the target directory stays within the workspace + boundary. Supports optional branch selection and shallow clone + depth. URLs are validated against allowed schemes (https, http, + ssh, git, SCP-like). Local paths and ``file://`` URLs are + rejected. + """ + + def __init__(self, *, workspace: Path) -> None: + """Initialize the git_clone tool. + + Args: + workspace: Absolute path to the workspace root. + """ + super().__init__( + name="git_clone", + description=( + "Clone a git repository into a directory within the " + "workspace. Supports branch selection and shallow clones." + ), + parameters_schema={ + "type": "object", + "properties": { + "url": { + "type": "string", + "description": "Repository URL to clone.", + }, + "directory": { + "type": "string", + "description": ("Target directory name within workspace."), + }, + "branch": { + "type": "string", + "description": "Branch to clone.", + }, + "depth": { + "type": "integer", + "description": "Shallow clone depth.", + "minimum": 1, + }, + }, + "required": ["url"], + "additionalProperties": False, + }, + workspace=workspace, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Clone a repository. + + Args: + arguments: Clone URL, optional directory, branch, depth. + + Returns: + A ``ToolExecutionResult`` with the clone output. + """ + url: str = arguments["url"] + + if not _is_allowed_clone_url(url): + logger.warning( + GIT_CLONE_URL_REJECTED, + url=url, + ) + return ToolExecutionResult( + content=( + "Invalid clone URL. Only https://, http://, ssh://, " + "git://, and SCP-like (user@host:path) URLs are " + "allowed" + ), + is_error=True, + ) + + args = ["clone"] + + if branch := arguments.get("branch"): + if err := self._check_ref(branch, param="branch"): + return err + args.extend(["--branch", branch]) + + if depth := arguments.get("depth"): + args.extend(["--depth", str(depth)]) + + args.append("--") + args.append(url) + + if directory := arguments.get("directory"): + if err := self._check_paths([directory]): + return err + args.append(directory) + + return await self._run_git(args, deadline=_CLONE_TIMEOUT) diff --git a/tests/unit/observability/test_events.py b/tests/unit/observability/test_events.py index 226565763c..d26284c7d0 100644 --- a/tests/unit/observability/test_events.py +++ b/tests/unit/observability/test_events.py @@ -14,6 +14,15 @@ CONFIG_VALIDATION_FAILED, ) from ai_company.observability.events.execution import EXECUTION_TASK_CREATED +from ai_company.observability.events.git import ( + GIT_CLONE_URL_REJECTED, + GIT_COMMAND_FAILED, + GIT_COMMAND_START, + GIT_COMMAND_SUCCESS, + GIT_COMMAND_TIMEOUT, + GIT_REF_INJECTION_BLOCKED, + GIT_WORKSPACE_VIOLATION, +) from ai_company.observability.events.prompt import PROMPT_BUILD_START from ai_company.observability.events.provider import ( PROVIDER_CALL_START, @@ -74,6 +83,7 @@ def test_all_domain_modules_discovered(self) -> None: "budget", "config", "execution", + "git", "prompt", "provider", "role", @@ -116,5 +126,14 @@ def test_routing_events_exist(self) -> None: def test_prompt_events_exist(self) -> None: assert PROMPT_BUILD_START == "prompt.build.start" + def test_git_events_exist(self) -> None: + assert GIT_COMMAND_START == "git.command.start" + assert GIT_COMMAND_SUCCESS == "git.command.success" + assert GIT_COMMAND_FAILED == "git.command.failed" + assert GIT_COMMAND_TIMEOUT == "git.command.timeout" + assert GIT_WORKSPACE_VIOLATION == "git.workspace.violation" + assert GIT_CLONE_URL_REJECTED == "git.clone.url_rejected" + assert GIT_REF_INJECTION_BLOCKED == "git.ref.injection_blocked" + def test_tool_events_exist(self) -> None: assert TOOL_INVOKE_START == "tool.invoke.start" diff --git a/tests/unit/tools/git/__init__.py b/tests/unit/tools/git/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/tools/git/conftest.py b/tests/unit/tools/git/conftest.py new file mode 100644 index 0000000000..54ec344b0d --- /dev/null +++ b/tests/unit/tools/git/conftest.py @@ -0,0 +1,171 @@ +"""Fixtures for git tool tests.""" + +import os +import subprocess +from pathlib import Path # noqa: TC003 — pytest evaluates annotations + +import pytest + +import ai_company.tools.git_tools as git_tools_module +from ai_company.tools.git_tools import ( + GitBranchTool, + GitCloneTool, + GitCommitTool, + GitDiffTool, + GitLogTool, + GitStatusTool, +) + +_GIT_ENV = { + **os.environ, + "GIT_AUTHOR_NAME": "Test", + "GIT_AUTHOR_EMAIL": "test@test.local", + "GIT_COMMITTER_NAME": "Test", + "GIT_COMMITTER_EMAIL": "test@test.local", + "GIT_TERMINAL_PROMPT": "0", + "GIT_CONFIG_NOSYSTEM": "1", + "GIT_PROTOCOL_FROM_USER": "0", +} + + +def _run_git(args: list[str], cwd: Path) -> None: + """Run a git command synchronously.""" + subprocess.run( # noqa: S603 + ["git", *args], # noqa: S607 + cwd=cwd, + check=True, + capture_output=True, + env=_GIT_ENV, + ) + + +def _run_git_output(args: list[str], cwd: Path) -> str: + """Run a git command and return stdout.""" + result = subprocess.run( # noqa: S603 + ["git", *args], # noqa: S607 + cwd=cwd, + check=True, + capture_output=True, + text=True, + env=_GIT_ENV, + ) + return result.stdout.strip() + + +@pytest.fixture +def workspace(tmp_path: Path) -> Path: + """Bare workspace directory (no git repo).""" + return tmp_path + + +@pytest.fixture +def git_repo(tmp_path: Path) -> Path: + """Initialized git repo with one commit.""" + _run_git(["init"], tmp_path) + _run_git(["config", "user.name", "Test"], tmp_path) + _run_git(["config", "user.email", "test@test.local"], tmp_path) + readme = tmp_path / "README.md" + readme.write_text("# Test\n") + _run_git(["add", "."], tmp_path) + _run_git(["commit", "-m", "initial commit"], tmp_path) + return tmp_path + + +@pytest.fixture +def empty_git_repo(tmp_path: Path) -> Path: + """Initialized git repo with no commits.""" + _run_git(["init"], tmp_path) + return tmp_path + + +@pytest.fixture +def git_repo_with_changes(git_repo: Path) -> Path: + """Git repo with uncommitted changes.""" + (git_repo / "new_file.txt").write_text("hello\n") + (git_repo / "README.md").write_text("# Modified\n") + return git_repo + + +@pytest.fixture +def detached_head_repo(git_repo: Path) -> Path: + """Git repo with HEAD detached at initial commit.""" + sha = _run_git_output(["rev-parse", "HEAD"], git_repo) + _run_git(["checkout", sha], git_repo) + return git_repo + + +@pytest.fixture +def merge_conflict_repo(git_repo: Path) -> Path: + """Git repo with an active merge conflict.""" + _run_git(["checkout", "-b", "conflict-branch"], git_repo) + (git_repo / "README.md").write_text("# Conflict branch\n") + _run_git(["add", "."], git_repo) + _run_git(["commit", "-m", "conflict change"], git_repo) + _run_git(["switch", "-"], git_repo) + (git_repo / "README.md").write_text("# Main change\n") + _run_git(["add", "."], git_repo) + _run_git(["commit", "-m", "main change"], git_repo) + # Merge will fail with conflict — don't use check=True + subprocess.run( + ["git", "merge", "conflict-branch"], # noqa: S607 + cwd=git_repo, + capture_output=True, + check=False, + env=_GIT_ENV, + ) + return git_repo + + +# ── Tool factory fixtures ──────────────────────────────────────── + + +@pytest.fixture +def status_tool(git_repo: Path) -> GitStatusTool: + """GitStatusTool bound to the test repo.""" + return GitStatusTool(workspace=git_repo) + + +@pytest.fixture +def log_tool(git_repo: Path) -> GitLogTool: + """GitLogTool bound to the test repo.""" + return GitLogTool(workspace=git_repo) + + +@pytest.fixture +def diff_tool(git_repo: Path) -> GitDiffTool: + """GitDiffTool bound to the test repo.""" + return GitDiffTool(workspace=git_repo) + + +@pytest.fixture +def branch_tool(git_repo: Path) -> GitBranchTool: + """GitBranchTool bound to the test repo.""" + return GitBranchTool(workspace=git_repo) + + +@pytest.fixture +def commit_tool(git_repo: Path) -> GitCommitTool: + """GitCommitTool bound to the test repo.""" + return GitCommitTool(workspace=git_repo) + + +@pytest.fixture +def clone_tool(workspace: Path) -> GitCloneTool: + """GitCloneTool bound to a bare workspace.""" + return GitCloneTool(workspace=workspace) + + +@pytest.fixture +def allow_local_clone(monkeypatch: pytest.MonkeyPatch) -> None: + """Allow local file paths in clone URL validation for testing. + + Also sets ``GIT_ALLOW_PROTOCOL=file`` so that ``_run_git`` (which + uses ``GIT_CONFIG_GLOBAL=/dev/null``) still permits the ``file`` + transport. + """ + monkeypatch.setattr( + git_tools_module, + "_is_allowed_clone_url", + lambda url: True, + ) + monkeypatch.setenv("GIT_ALLOW_PROTOCOL", "file") diff --git a/tests/unit/tools/git/test_git_tools.py b/tests/unit/tools/git/test_git_tools.py new file mode 100644 index 0000000000..d25677c0ae --- /dev/null +++ b/tests/unit/tools/git/test_git_tools.py @@ -0,0 +1,727 @@ +"""Tests for built-in git tools.""" + +import asyncio +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from ai_company.core.enums import ToolCategory +from ai_company.tools.git_tools import ( + GitBranchTool, + GitCloneTool, + GitCommitTool, + GitDiffTool, + GitLogTool, + GitStatusTool, +) + +from .conftest import _run_git + +pytestmark = pytest.mark.timeout(30) + +_ALL_GIT_TOOL_CLASSES = [ + GitStatusTool, + GitLogTool, + GitDiffTool, + GitBranchTool, + GitCommitTool, + GitCloneTool, +] + + +# ── Workspace validation (shared across tools) ─────────────────── + + +@pytest.mark.unit +class TestWorkspaceValidation: + """Path traversal and boundary enforcement.""" + + async def test_path_traversal_blocked(self, status_tool: GitStatusTool) -> None: + tool = GitLogTool(workspace=status_tool.workspace) + result = await tool.execute( + arguments={"paths": ["../../etc/passwd"]}, + ) + assert result.is_error + + async def test_absolute_path_outside_workspace(self, git_repo: Path) -> None: + tool = GitDiffTool(workspace=git_repo) + outside = str(git_repo.parent / "outside") + result = await tool.execute( + arguments={"paths": [outside]}, + ) + assert result.is_error + + async def test_symlink_escape_blocked(self, git_repo: Path) -> None: + outside = git_repo.parent / "outside_dir" + outside.mkdir() + (outside / "secret.txt").write_text("secret") + link = git_repo / "escape_link" + try: + link.symlink_to(outside) + except OSError: + pytest.skip("Cannot create symlinks") + tool = GitDiffTool(workspace=git_repo) + result = await tool.execute( + arguments={"paths": ["escape_link/secret.txt"]}, + ) + assert result.is_error + + async def test_valid_relative_path_accepted(self, git_repo: Path) -> None: + tool = GitDiffTool(workspace=git_repo) + result = await tool.execute( + arguments={"paths": ["README.md"]}, + ) + assert not result.is_error + + def test_workspace_must_be_absolute(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="absolute path"): + GitStatusTool(workspace=Path("relative/path")) + + +# ── Tool properties ────────────────────────────────────────────── + + +@pytest.mark.unit +class TestToolProperties: + """Name, description, category, and schema for all git tools.""" + + @pytest.mark.parametrize( + ("tool_cls", "expected_name"), + [ + (GitStatusTool, "git_status"), + (GitLogTool, "git_log"), + (GitDiffTool, "git_diff"), + (GitBranchTool, "git_branch"), + (GitCommitTool, "git_commit"), + (GitCloneTool, "git_clone"), + ], + ) + def test_name( + self, + tool_cls: type, + expected_name: str, + tmp_path: Path, + ) -> None: + tool = tool_cls(workspace=tmp_path) + assert tool.name == expected_name + + @pytest.mark.parametrize("tool_cls", _ALL_GIT_TOOL_CLASSES) + def test_category_is_version_control(self, tool_cls: type, tmp_path: Path) -> None: + tool = tool_cls(workspace=tmp_path) + assert tool.category == ToolCategory.VERSION_CONTROL + + @pytest.mark.parametrize("tool_cls", _ALL_GIT_TOOL_CLASSES) + def test_has_schema(self, tool_cls: type, tmp_path: Path) -> None: + tool = tool_cls(workspace=tmp_path) + schema = tool.parameters_schema + assert schema is not None + assert schema["type"] == "object" + + @pytest.mark.parametrize("tool_cls", _ALL_GIT_TOOL_CLASSES) + def test_description_not_empty(self, tool_cls: type, tmp_path: Path) -> None: + tool = tool_cls(workspace=tmp_path) + assert tool.description + + +# ── GitStatusTool ───────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitStatusTool: + """Tests for git_status.""" + + async def test_clean_repo(self, status_tool: GitStatusTool) -> None: + result = await status_tool.execute(arguments={}) + assert not result.is_error + + async def test_short_format(self, git_repo_with_changes: Path) -> None: + tool = GitStatusTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={"short": True}, + ) + assert not result.is_error + assert result.content + + async def test_porcelain_format(self, git_repo_with_changes: Path) -> None: + tool = GitStatusTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={"porcelain": True}, + ) + assert not result.is_error + assert "README.md" in result.content or "new_file" in result.content + + async def test_not_a_git_repo(self, workspace: Path) -> None: + tool = GitStatusTool(workspace=workspace) + result = await tool.execute(arguments={}) + assert result.is_error + + +# ── GitLogTool ──────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitLogTool: + """Tests for git_log.""" + + async def test_shows_initial_commit(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute(arguments={}) + assert not result.is_error + assert "initial commit" in result.content.lower() + + async def test_oneline_format(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"oneline": True}, + ) + assert not result.is_error + lines = result.content.strip().split("\n") + assert len(lines) >= 1 + + async def test_max_count_respected(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"max_count": 1, "oneline": True}, + ) + assert not result.is_error + lines = result.content.strip().split("\n") + assert len(lines) == 1 + + async def test_max_count_clamped(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"max_count": 200}, + ) + assert not result.is_error + + async def test_empty_repo_no_commits(self, empty_git_repo: Path) -> None: + tool = GitLogTool(workspace=empty_git_repo) + result = await tool.execute(arguments={}) + assert result.is_error + + async def test_author_filter(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"author": "NoSuchAuthor"}, + ) + assert not result.is_error + assert "No commits found" in result.content + + async def test_path_filter(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"paths": ["README.md"]}, + ) + assert not result.is_error + assert "initial commit" in result.content.lower() + + async def test_ref_parameter(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"ref": "HEAD"}, + ) + assert not result.is_error + + +# ── GitDiffTool ─────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitDiffTool: + """Tests for git_diff.""" + + async def test_no_changes_returns_message(self, diff_tool: GitDiffTool) -> None: + result = await diff_tool.execute(arguments={}) + assert not result.is_error + assert result.content == "No changes" + + async def test_unstaged_changes(self, git_repo_with_changes: Path) -> None: + tool = GitDiffTool(workspace=git_repo_with_changes) + result = await tool.execute(arguments={}) + assert not result.is_error + assert "Modified" in result.content or "README" in result.content + + async def test_staged_changes(self, git_repo_with_changes: Path) -> None: + _run_git(["add", "README.md"], git_repo_with_changes) + tool = GitDiffTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={"staged": True}, + ) + assert not result.is_error + assert "README" in result.content + + async def test_stat_format(self, git_repo_with_changes: Path) -> None: + tool = GitDiffTool(workspace=git_repo_with_changes) + result = await tool.execute(arguments={"stat": True}) + assert not result.is_error + + async def test_ref_comparison(self, diff_tool: GitDiffTool) -> None: + result = await diff_tool.execute( + arguments={"ref1": "HEAD", "ref2": "HEAD"}, + ) + assert not result.is_error + + async def test_path_filter(self, git_repo_with_changes: Path) -> None: + tool = GitDiffTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={"paths": ["README.md"]}, + ) + assert not result.is_error + + +# ── GitBranchTool ───────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitBranchTool: + """Tests for git_branch.""" + + async def test_list_branches(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "list"}, + ) + assert not result.is_error + + async def test_create_branch(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={ + "action": "create", + "name": "feature/test", + }, + ) + assert not result.is_error + + async def test_create_and_switch(self, branch_tool: GitBranchTool) -> None: + await branch_tool.execute( + arguments={ + "action": "create", + "name": "feature/switch-test", + }, + ) + result = await branch_tool.execute( + arguments={ + "action": "switch", + "name": "feature/switch-test", + }, + ) + assert not result.is_error + + async def test_delete_branch(self, branch_tool: GitBranchTool) -> None: + await branch_tool.execute( + arguments={ + "action": "create", + "name": "to-delete", + }, + ) + result = await branch_tool.execute( + arguments={ + "action": "delete", + "name": "to-delete", + }, + ) + assert not result.is_error + + async def test_force_delete(self, branch_tool: GitBranchTool) -> None: + await branch_tool.execute( + arguments={ + "action": "create", + "name": "force-del", + }, + ) + result = await branch_tool.execute( + arguments={ + "action": "delete", + "name": "force-del", + "force": True, + }, + ) + assert not result.is_error + + async def test_create_with_start_point(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={ + "action": "create", + "name": "from-head", + "start_point": "HEAD", + }, + ) + assert not result.is_error + + async def test_name_required_for_create(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "create"}, + ) + assert result.is_error + assert "required" in result.content.lower() + + async def test_name_required_for_switch(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "switch"}, + ) + assert result.is_error + + async def test_name_required_for_delete(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "delete"}, + ) + assert result.is_error + + async def test_switch_nonexistent_branch(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={ + "action": "switch", + "name": "no-such-branch", + }, + ) + assert result.is_error + + +# ── GitCommitTool ───────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitCommitTool: + """Tests for git_commit.""" + + async def test_commit_with_paths(self, git_repo_with_changes: Path) -> None: + tool = GitCommitTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={ + "message": "add new file", + "paths": ["new_file.txt"], + }, + ) + assert not result.is_error + + async def test_commit_all(self, git_repo_with_changes: Path) -> None: + tool = GitCommitTool(workspace=git_repo_with_changes) + result = await tool.execute( + arguments={"message": "commit all", "all": True}, + ) + assert not result.is_error + + async def test_nothing_to_commit(self, commit_tool: GitCommitTool) -> None: + result = await commit_tool.execute( + arguments={"message": "empty"}, + ) + assert result.is_error + + async def test_path_traversal_in_commit(self, commit_tool: GitCommitTool) -> None: + result = await commit_tool.execute( + arguments={ + "message": "sneaky", + "paths": ["../../etc/passwd"], + }, + ) + assert result.is_error + + +# ── GitCloneTool ────────────────────────────────────────────────── + + +@pytest.mark.unit +class TestGitCloneTool: + """Tests for git_clone.""" + + async def test_clone_local_repo( + self, + git_repo: Path, + workspace: Path, + allow_local_clone: None, + ) -> None: + tool = GitCloneTool(workspace=workspace) + result = await tool.execute( + arguments={ + "url": str(git_repo), + "directory": "cloned", + }, + ) + assert not result.is_error + assert (workspace / "cloned" / "README.md").exists() + + async def test_clone_with_depth( + self, + git_repo: Path, + workspace: Path, + allow_local_clone: None, + ) -> None: + tool = GitCloneTool(workspace=workspace) + result = await tool.execute( + arguments={ + "url": str(git_repo), + "directory": "shallow", + "depth": 1, + }, + ) + assert not result.is_error + + async def test_clone_directory_outside_workspace( + self, + git_repo: Path, + workspace: Path, + allow_local_clone: None, + ) -> None: + tool = GitCloneTool(workspace=workspace) + result = await tool.execute( + arguments={ + "url": str(git_repo), + "directory": "../../outside", + }, + ) + assert result.is_error + + async def test_clone_invalid_url(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "not-a-real-url-at-all"}, + ) + assert result.is_error + assert "Invalid clone URL" in result.content + + async def test_clone_with_branch( + self, + git_repo: Path, + workspace: Path, + allow_local_clone: None, + ) -> None: + _run_git(["branch", "test-branch"], git_repo) + tool = GitCloneTool(workspace=workspace) + result = await tool.execute( + arguments={ + "url": str(git_repo), + "directory": "branch-clone", + "branch": "test-branch", + }, + ) + assert not result.is_error + + +# ── Security: flag injection prevention ─────────────────────────── + + +@pytest.mark.unit +class TestFlagInjectionPrevention: + """Refs and branch names starting with ``-`` must be rejected.""" + + async def test_log_ref_flag_blocked(self, log_tool: GitLogTool) -> None: + result = await log_tool.execute( + arguments={"ref": "--exec=malicious"}, + ) + assert result.is_error + assert "must not start with '-'" in result.content + + async def test_diff_ref1_flag_blocked(self, diff_tool: GitDiffTool) -> None: + result = await diff_tool.execute( + arguments={"ref1": "--upload-pack=evil"}, + ) + assert result.is_error + + async def test_diff_ref2_flag_blocked(self, diff_tool: GitDiffTool) -> None: + result = await diff_tool.execute( + arguments={"ref1": "HEAD", "ref2": "-c core.sshCommand=evil"}, + ) + assert result.is_error + + async def test_branch_name_flag_blocked(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "create", "name": "--set-upstream-to=evil"}, + ) + assert result.is_error + + async def test_branch_start_point_flag_blocked( + self, branch_tool: GitBranchTool + ) -> None: + result = await branch_tool.execute( + arguments={ + "action": "create", + "name": "ok-branch", + "start_point": "--exec=evil", + }, + ) + assert result.is_error + + async def test_switch_name_flag_blocked(self, branch_tool: GitBranchTool) -> None: + result = await branch_tool.execute( + arguments={"action": "switch", "name": "-c evil"}, + ) + assert result.is_error + + async def test_clone_branch_flag_blocked( + self, + clone_tool: GitCloneTool, + ) -> None: + result = await clone_tool.execute( + arguments={ + "url": "https://example.com/repo.git", + "branch": "--upload-pack=evil", + }, + ) + assert result.is_error + assert "must not start with '-'" in result.content + + +# ── Security: clone URL validation ─────────────────────────────── + + +@pytest.mark.unit +class TestCloneUrlValidation: + """Only remote URL schemes should be allowed for clone.""" + + async def test_local_path_blocked(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "/etc/passwd"}, + ) + assert result.is_error + assert "Invalid clone URL" in result.content + + async def test_file_scheme_blocked(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "file:///etc"}, + ) + assert result.is_error + + async def test_ext_protocol_blocked(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "ext::sh -c 'evil'"}, + ) + assert result.is_error + + async def test_https_allowed(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "https://example.com/repo.git"}, + ) + # URL is valid, clone will fail (no such host) but not from validation + assert "Invalid clone URL" not in result.content + + async def test_scp_syntax_allowed(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "git@github.com:user/repo.git"}, + ) + assert "Invalid clone URL" not in result.content + + async def test_relative_path_blocked(self, clone_tool: GitCloneTool) -> None: + result = await clone_tool.execute( + arguments={"url": "../outside-repo"}, + ) + assert result.is_error + + async def test_flag_url_blocked(self, clone_tool: GitCloneTool) -> None: + """URLs starting with '-' must be rejected (flag injection).""" + result = await clone_tool.execute( + arguments={"url": "-cfoo=bar@host:path"}, + ) + assert result.is_error + assert "Invalid clone URL" in result.content + + +# ── Edge cases: detached HEAD ───────────────────────────────────── + + +@pytest.mark.unit +class TestDetachedHead: + """Tools must work correctly in detached HEAD state.""" + + async def test_status_in_detached_head(self, detached_head_repo: Path) -> None: + tool = GitStatusTool(workspace=detached_head_repo) + result = await tool.execute(arguments={}) + assert not result.is_error + + async def test_log_in_detached_head(self, detached_head_repo: Path) -> None: + tool = GitLogTool(workspace=detached_head_repo) + result = await tool.execute(arguments={}) + assert not result.is_error + assert "initial commit" in result.content.lower() + + async def test_branch_list_in_detached_head(self, detached_head_repo: Path) -> None: + tool = GitBranchTool(workspace=detached_head_repo) + result = await tool.execute(arguments={"action": "list"}) + assert not result.is_error + assert "detached" in result.content.lower() or "HEAD" in result.content + + +# ── Edge cases: merge conflicts ─────────────────────────────────── + + +@pytest.mark.unit +class TestMergeConflict: + """Tools must report merge conflict state clearly.""" + + async def test_status_shows_conflict(self, merge_conflict_repo: Path) -> None: + tool = GitStatusTool(workspace=merge_conflict_repo) + result = await tool.execute(arguments={}) + assert not result.is_error + content = result.content.lower() + has_conflict_marker = ( + "unmerged" in content or "both modified" in content or "readme" in content + ) + assert has_conflict_marker + + async def test_commit_without_staging_fails_during_conflict( + self, merge_conflict_repo: Path + ) -> None: + """Committing without staging unmerged files must fail.""" + tool = GitCommitTool(workspace=merge_conflict_repo) + result = await tool.execute( + arguments={"message": "should fail"}, + ) + assert result.is_error + + +# ── _run_git error paths ───────────────────────────────────────── + + +@pytest.mark.unit +class TestRunGitErrorPaths: + """Unit tests for _run_git timeout and OSError handling.""" + + async def test_timeout_kills_process(self, status_tool: GitStatusTool) -> None: + """Timeout kills the process and returns an error result.""" + calls = 0 + + async def slow_communicate() -> tuple[bytes, bytes]: + nonlocal calls + calls += 1 + if calls == 1: + await asyncio.sleep(999) + return b"", b"" + + mock_proc = MagicMock() + mock_proc.communicate = slow_communicate + mock_proc.kill = MagicMock() + + with patch( + "asyncio.create_subprocess_exec", + return_value=mock_proc, + ): + result = await status_tool._run_git(["status"], deadline=0.01) + + assert result.is_error + assert "timed out" in result.content + mock_proc.kill.assert_called_once() + + async def test_oserror_returns_error(self, status_tool: GitStatusTool) -> None: + """OSError when git binary not found returns error result.""" + with patch( + "asyncio.create_subprocess_exec", + side_effect=FileNotFoundError("git not found"), + ): + result = await status_tool._run_git(["status"]) + + assert result.is_error + assert "Failed to start git" in result.content + + +# ── Error handling edge cases ───────────────────────────────────── + + +@pytest.mark.unit +class TestErrorHandling: + """Edge cases and error conditions.""" + + async def test_not_a_git_repo(self, workspace: Path) -> None: + tool = GitStatusTool(workspace=workspace) + result = await tool.execute(arguments={}) + assert result.is_error + + def test_workspace_property(self, git_repo: Path) -> None: + tool = GitStatusTool(workspace=git_repo) + assert tool.workspace == git_repo.resolve() + + async def test_to_definition(self, git_repo: Path) -> None: + tool = GitStatusTool(workspace=git_repo) + defn = tool.to_definition() + assert defn.name == "git_status" + assert defn.description