Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ src/ai_company/
providers/ # LLM provider abstraction (LiteLLM adapter)
security/ # SecOps agent, approval gates, audit
templates/ # Pre-built company templates and builder
tools/ # Tool registry, built-in git tools, MCP integration, role-based access, sandboxing
tools/ # Tool registry, built-in tools (file_system/, git), MCP integration, role-based access, sandboxing
```

## Shell Usage
Expand Down
14 changes: 11 additions & 3 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ When coordination metrics collection is enabled, the system can optionally class

| Category | Tools | Typical Roles |
|----------|-------|---------------|
| **File System** | Read, write, edit, delete files | All developers, writers |
| **File System** | Read, write, edit, list, delete files | All developers, writers |
| **Code Execution** | Run code in sandboxed environments | Developers, QA |
| **Version Control** | Git operations, PR management | Developers, DevOps |
| **Web** | HTTP requests, web scraping, search | Researchers, analysts |
Expand Down Expand Up @@ -2375,7 +2375,15 @@ ai-company/
│ │ │ ├── protocol.py # SandboxBackend protocol
│ │ │ ├── subprocess.py # SubprocessSandbox (default for low-risk)
│ │ │ └── docker.py # DockerSandbox (for code_runner, terminal)
│ │ ├── file_system.py # File operations (M3)
│ │ ├── file_system/ # Built-in file system tools
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── _base_fs_tool.py # BaseFileSystemTool ABC
│ │ │ ├── _path_validator.py # Workspace path validation
│ │ │ ├── delete_file.py # DeleteFileTool
│ │ │ ├── edit_file.py # EditFileTool
│ │ │ ├── list_directory.py # ListDirectoryTool
│ │ │ ├── read_file.py # ReadFileTool
│ │ │ └── write_file.py # WriteFileTool
│ │ ├── _git_base.py # Base class for git tools (workspace, subprocess)
│ │ ├── git_tools.py # Git operations — 6 built-in tools
│ │ ├── code_runner.py # Code execution (M3)
Expand Down Expand Up @@ -2459,7 +2467,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events.<domain> import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. |
| **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. |
| **Tool permission checking** | Adopted (M3) | `ToolPermissionChecker` enforces category-level gating based on `ToolAccessLevel` (sandboxed → restricted → standard → elevated, plus custom). Priority-based resolution: denied list → allowed list → level categories → deny. Case-insensitive name matching. `ToolInvoker` filters definitions for prompt and checks at invocation time. | Defense-in-depth: agents only see permitted tools in the LLM prompt, and invocations are re-checked at execution time. Explicit allow/deny lists provide per-agent overrides. See §11.1.1. |
| **Tool sandboxing** | Planned (M3) | Layered `SandboxBackend` protocol: `SubprocessSandbox` for low-risk tools (file, git), `DockerSandbox` for high-risk tools (code_runner, terminal, web, database). `K8sSandbox` planned for future container deployments. | Risk-proportionate isolation. Docker optional — only needed for code execution and network-sensitive tools. Pluggable protocol enables seamless migration to K8s per-agent pods in Phase 3-4. See §11.1.2. |
| **Tool sandboxing** | Partial (M3) | File system tools use in-process `PathValidator` for workspace-scoped path validation (symlink resolution + containment check). `BaseFileSystemTool` ABC provides shared `ToolCategory.FILE_SYSTEM` and `PathValidator` integration — all file system tools extend this base. `SandboxBackend` protocol with `SubprocessSandbox` / `DockerSandbox` remains planned for git, code_runner, terminal, web, and database tools. `K8sSandbox` planned for future container deployments. | File system tools use defence-in-depth path validation; heavier sandbox isolation reserved for higher-risk tool categories (code execution, network). See §11.1.2. |
| **Crash recovery** | Planned (M3) | Pluggable `RecoveryStrategy` protocol. M3: `FailAndReassignStrategy` (catch at engine boundary, log snapshot, mark FAILED, reassign). M4/M5: `CheckpointStrategy` (persist `AgentContext` per turn, resume from last checkpoint). | Immutable `model_copy` pattern makes checkpoint serialization trivial to add later. Fail-and-reassign is sufficient for short MVP tasks. See §6.6. |
| **Agent behavior testing** | Planned (M3) | Scripted `FakeProvider` for unit tests (deterministic turn sequences); behavioral outcome assertions for integration tests (task completed, tools called, cost within budget). | Leverages existing `FakeProvider` and `CompletionResponseFactory` fixtures. Precise engine testing without brittle response-matching at integration level. |
| **LLM call analytics** | Planned (incremental) | M3: proxy metrics (`turns_per_task`, `tokens_per_task`). M4: call categorization (`productive`, `coordination`, `system`) + orchestration ratio. M5+: full analytics (retry tracking, latency, cache hits, per-provider comparison). | Append-only, never blocks execution. Builds on existing `CostRecord` infrastructure. Detects orchestration overhead early. See §10.5. |
Expand Down
17 changes: 17 additions & 0 deletions src/ai_company/observability/events/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@
TOOL_PERMISSION_DENIED: Final[str] = "tool.permission.denied"
TOOL_PERMISSION_CHECKER_CREATED: Final[str] = "tool.permission.checker_created"
TOOL_PERMISSION_FILTERED: Final[str] = "tool.permission.filtered"

# ── File system tool events ──────────────────────────────────────
TOOL_FS_READ: Final[str] = "tool.fs.read"
TOOL_FS_WRITE: Final[str] = "tool.fs.write"
TOOL_FS_EDIT: Final[str] = "tool.fs.edit"
TOOL_FS_EDIT_NOT_FOUND: Final[str] = "tool.fs.edit_not_found"
TOOL_FS_LIST: Final[str] = "tool.fs.list"
TOOL_FS_DELETE: Final[str] = "tool.fs.delete"
TOOL_FS_PATH_VIOLATION: Final[str] = "tool.fs.path_violation"
TOOL_FS_BINARY_DETECTED: Final[str] = "tool.fs.binary_detected"
TOOL_FS_SIZE_EXCEEDED: Final[str] = "tool.fs.size_exceeded"
TOOL_FS_ERROR: Final[str] = "tool.fs.error"
TOOL_FS_STAT_FAILED: Final[str] = "tool.fs.stat_failed"
TOOL_FS_WORKSPACE_INVALID: Final[str] = "tool.fs.workspace_invalid"
TOOL_FS_PARENT_NOT_FOUND: Final[str] = "tool.fs.parent_not_found"
TOOL_FS_GLOB_REJECTED: Final[str] = "tool.fs.glob_rejected"
TOOL_FS_NOOP: Final[str] = "tool.fs.noop"
16 changes: 16 additions & 0 deletions src/ai_company/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
ToolPermissionDeniedError,
)
from .examples.echo import EchoTool
from .file_system import (
BaseFileSystemTool,
DeleteFileTool,
EditFileTool,
ListDirectoryTool,
PathValidator,
ReadFileTool,
WriteFileTool,
)
from .git_tools import (
GitBranchTool,
GitCloneTool,
Expand All @@ -22,14 +31,20 @@
from .registry import ToolRegistry

__all__ = [
"BaseFileSystemTool",
"BaseTool",
"DeleteFileTool",
"EchoTool",
"EditFileTool",
"GitBranchTool",
"GitCloneTool",
"GitCommitTool",
"GitDiffTool",
"GitLogTool",
"GitStatusTool",
"ListDirectoryTool",
"PathValidator",
"ReadFileTool",
"ToolError",
"ToolExecutionError",
"ToolExecutionResult",
Expand All @@ -39,4 +54,5 @@
"ToolPermissionChecker",
"ToolPermissionDeniedError",
"ToolRegistry",
"WriteFileTool",
]
23 changes: 23 additions & 0 deletions src/ai_company/tools/file_system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Built-in file system tools for workspace interaction.

Provides tools for reading, writing, editing, listing, and deleting
files within a sandboxed workspace directory.
"""

from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool
from ai_company.tools.file_system._path_validator import PathValidator
from ai_company.tools.file_system.delete_file import DeleteFileTool
from ai_company.tools.file_system.edit_file import EditFileTool
from ai_company.tools.file_system.list_directory import ListDirectoryTool
from ai_company.tools.file_system.read_file import ReadFileTool
from ai_company.tools.file_system.write_file import WriteFileTool

__all__ = [
"BaseFileSystemTool",
"DeleteFileTool",
"EditFileTool",
"ListDirectoryTool",
"PathValidator",
"ReadFileTool",
"WriteFileTool",
]
78 changes: 78 additions & 0 deletions src/ai_company/tools/file_system/_base_fs_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Base class for file system tools.

Provides the common ``ToolCategory.FILE_SYSTEM`` category and a
``PathValidator`` instance bound to the workspace root.
"""

from abc import ABC
from typing import TYPE_CHECKING, Any

from ai_company.core.enums import ToolCategory
from ai_company.tools.base import BaseTool
from ai_company.tools.file_system._path_validator import PathValidator

if TYPE_CHECKING:
from pathlib import Path


def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]:
"""Map an OS error to ``(log_key, user_message)`` for FS operations.

Args:
exc: The caught OS-level exception.
user_path: The original user-supplied path string.
verb: Action verb for the fallback message
(e.g. ``"reading"``, ``"editing"``).

Returns:
A two-tuple of (structured log key, human-readable message).
"""
if isinstance(exc, FileNotFoundError):
return "not_found", f"File not found: {user_path}"
if isinstance(exc, IsADirectoryError):
return "is_directory", f"Path is a directory, not a file: {user_path}"
if isinstance(exc, PermissionError):
return "permission_denied", f"Permission denied: {user_path}"
return "os_error", f"OS error {verb} file '{user_path}': {exc}"


class BaseFileSystemTool(BaseTool, ABC):
"""Abstract base for all file system tools.

Sets ``category=ToolCategory.FILE_SYSTEM`` and holds a shared
``PathValidator`` for workspace-scoped path resolution.
"""

def __init__(
self,
*,
workspace_root: Path,
name: str,
description: str = "",
parameters_schema: dict[str, Any] | None = None,
) -> None:
"""Initialize with a workspace root and tool metadata.

Args:
workspace_root: Root directory bounding file access.
name: Tool name.
description: Human-readable description.
parameters_schema: JSON Schema for tool parameters.
"""
super().__init__(
name=name,
description=description,
category=ToolCategory.FILE_SYSTEM,
parameters_schema=parameters_schema,
)
self._path_validator = PathValidator(workspace_root)

@property
def workspace_root(self) -> Path:
"""The resolved workspace root directory."""
return self._path_validator.workspace_root

@property
def path_validator(self) -> PathValidator:
"""The path validator instance."""
return self._path_validator
126 changes: 126 additions & 0 deletions src/ai_company/tools/file_system/_path_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Workspace path validation for file system tools.

All file system tools delegate path validation to ``PathValidator``,
which ensures that resolved paths remain within the configured
workspace root. This prevents path-traversal attacks via ``../``,
symlinks, or absolute paths outside the workspace.
"""

from pathlib import PurePosixPath, PureWindowsPath
from typing import TYPE_CHECKING

from ai_company.observability import get_logger
from ai_company.observability.events.tool import (
TOOL_FS_PARENT_NOT_FOUND,
TOOL_FS_PATH_VIOLATION,
TOOL_FS_WORKSPACE_INVALID,
)

if TYPE_CHECKING:
from pathlib import Path

logger = get_logger(__name__)


class PathValidator:
"""Validates and resolves paths against a workspace root.

All resolved paths must remain within ``workspace_root``.
Symlinks are resolved before checking, so a symlink pointing
outside the workspace is rejected.

Attributes:
workspace_root: The resolved workspace root directory.
"""

def __init__(self, workspace_root: Path) -> None:
"""Initialize with a workspace root directory.

Args:
workspace_root: Root directory that bounds all file access.

Raises:
ValueError: If workspace_root does not exist or is not a
directory.
"""
resolved = workspace_root.resolve()
if not resolved.is_dir():
logger.warning(
TOOL_FS_WORKSPACE_INVALID,
workspace_root=str(workspace_root),
)
msg = f"Workspace root is not an existing directory: {workspace_root}"
raise ValueError(msg)
self._workspace_root = resolved

@property
def workspace_root(self) -> Path:
"""The resolved workspace root directory."""
return self._workspace_root

def validate(self, path: str) -> Path:
"""Resolve *path* against workspace root and validate containment.

Args:
path: A relative or absolute path string from the user.

Returns:
The resolved ``Path`` guaranteed to be within the workspace.

Raises:
ValueError: If the resolved path escapes the workspace.
"""
# Reject absolute paths outright — agents must use relative paths.
if PurePosixPath(path).is_absolute() or PureWindowsPath(path).is_absolute():
logger.warning(TOOL_FS_PATH_VIOLATION, user_path=path)
msg = f"Absolute paths not allowed: {path}"
raise ValueError(msg)

# NOTE: There is an inherent TOCTOU gap between this validation
# and the actual file operation (which runs in asyncio.to_thread).
# A concurrent process could swap in a symlink between validation
# and use. Full mitigation requires OS-level sandboxing (e.g.
# openat2 RESOLVE_BENEATH on Linux). User-space path validation
# is a best-effort defence-in-depth layer.
try:
resolved = (self._workspace_root / path).resolve()
except OSError as exc:
logger.warning(
TOOL_FS_PATH_VIOLATION,
user_path=path,
error=str(exc),
)
msg = f"Invalid path: {path} ({exc})"
raise ValueError(msg) from exc

if not resolved.is_relative_to(self._workspace_root):
logger.warning(
TOOL_FS_PATH_VIOLATION,
user_path=path,
)
msg = f"Path escapes workspace: {path}"
raise ValueError(msg)
return resolved
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def validate_parent_exists(self, path: str) -> Path:
"""Like ``validate`` but also checks that the parent directory exists.

Args:
path: A relative or absolute path string from the user.

Returns:
The resolved ``Path`` within the workspace whose parent exists.

Raises:
ValueError: If the resolved path escapes the workspace or
the parent directory does not exist.
"""
resolved = self.validate(path)
if not resolved.parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
Comment on lines +115 to +125

Copilot AI Mar 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_parent_exists() only checks resolved.parent.exists(). If the parent path exists but is a file (e.g., writing to hello.txt/new.txt), validation passes and the write later fails with a generic OS error. Consider checking resolved.parent.is_dir() and returning a clearer error message (and event) for “parent is not a directory”.

Suggested change
ValueError: If the resolved path escapes the workspace or
the parent directory does not exist.
"""
resolved = self.validate(path)
if not resolved.parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
ValueError: If the resolved path escapes the workspace, the
parent directory does not exist, or the parent is not a
directory.
"""
resolved = self.validate(path)
parent = resolved.parent
if not parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
if not parent.is_dir():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent path is not a directory: {path}"
raise ValueError(msg)

Copilot uses AI. Check for mistakes.
return resolved
Loading