Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ src/ai_company/
providers/ # LLM provider abstraction (LiteLLM adapter)
security/ # SecOps agent, approval gates, audit
templates/ # Pre-built company templates and builder
tools/ # Tool registry, MCP integration, role-based access, sandboxing
tools/ # Tool registry, MCP integration, role-based access, built-in tools (file_system/)
```

## Shell Usage
Expand Down
12 changes: 10 additions & 2 deletions DESIGN_SPEC.md
Original file line number Diff line number Diff line change
Expand Up @@ -2344,7 +2344,15 @@ ai-company/
│ │ │ ├── protocol.py # SandboxBackend protocol
│ │ │ ├── subprocess.py # SubprocessSandbox (default for low-risk)
│ │ │ └── docker.py # DockerSandbox (for code_runner, terminal)
│ │ ├── file_system.py # File operations (M3)
│ │ ├── file_system/ # Built-in file system tools
│ │ │ ├── __init__.py # Package exports
│ │ │ ├── _base_fs_tool.py # BaseFileSystemTool ABC
│ │ │ ├── _path_validator.py # Workspace path validation
│ │ │ ├── delete_file.py # DeleteFileTool
│ │ │ ├── edit_file.py # EditFileTool
│ │ │ ├── list_directory.py # ListDirectoryTool
│ │ │ ├── read_file.py # ReadFileTool
│ │ │ └── write_file.py # WriteFileTool
│ │ ├── git_tools.py # Git operations (M3)
│ │ ├── code_runner.py # Code execution (M3)
│ │ ├── web_tools.py # HTTP, search (M3)
Expand Down Expand Up @@ -2427,7 +2435,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted**
| **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events.<domain> import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. |
| **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. |
| **Tool permission checking** | Adopted (M3) | `ToolPermissionChecker` enforces category-level gating based on `ToolAccessLevel` (sandboxed → restricted → standard → elevated, plus custom). Priority-based resolution: denied list → allowed list → level categories → deny. Case-insensitive name matching. `ToolInvoker` filters definitions for prompt and checks at invocation time. | Defense-in-depth: agents only see permitted tools in the LLM prompt, and invocations are re-checked at execution time. Explicit allow/deny lists provide per-agent overrides. See §11.1.1. |
| **Tool sandboxing** | Planned (M3) | Layered `SandboxBackend` protocol: `SubprocessSandbox` for low-risk tools (file, git), `DockerSandbox` for high-risk tools (code_runner, terminal, web, database). `K8sSandbox` planned for future container deployments. | Risk-proportionate isolation. Docker optional — only needed for code execution and network-sensitive tools. Pluggable protocol enables seamless migration to K8s per-agent pods in Phase 3-4. See §11.1.2. |
| **Tool sandboxing** | Partial (M3) | File system tools use in-process `PathValidator` for workspace-scoped path validation (symlink resolution + containment check). `BaseFileSystemTool` ABC provides shared `ToolCategory.FILE_SYSTEM` and `PathValidator` integration — all file system tools extend this base. `SandboxBackend` protocol with `SubprocessSandbox` / `DockerSandbox` remains planned for git, code_runner, terminal, web, and database tools. `K8sSandbox` planned for future container deployments. | File system tools use defence-in-depth path validation; heavier sandbox isolation reserved for higher-risk tool categories (code execution, network). See §11.1.2. |
| **Crash recovery** | Planned (M3) | Pluggable `RecoveryStrategy` protocol. M3: `FailAndReassignStrategy` (catch at engine boundary, log snapshot, mark FAILED, reassign). M4/M5: `CheckpointStrategy` (persist `AgentContext` per turn, resume from last checkpoint). | Immutable `model_copy` pattern makes checkpoint serialization trivial to add later. Fail-and-reassign is sufficient for short MVP tasks. See §6.6. |
| **Agent behavior testing** | Planned (M3) | Scripted `FakeProvider` for unit tests (deterministic turn sequences); behavioral outcome assertions for integration tests (task completed, tools called, cost within budget). | Leverages existing `FakeProvider` and `CompletionResponseFactory` fixtures. Precise engine testing without brittle response-matching at integration level. |
| **LLM call analytics** | Planned (incremental) | M3: proxy metrics (`turns_per_task`, `tokens_per_task`). M4: call categorization (`productive`, `coordination`, `system`) + orchestration ratio. M5+: full analytics (retry tracking, latency, cache hits, per-provider comparison). | Append-only, never blocks execution. Builds on existing `CostRecord` infrastructure. Detects orchestration overhead early. See §10.5. |
Expand Down
17 changes: 17 additions & 0 deletions src/ai_company/observability/events/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,20 @@
TOOL_PERMISSION_DENIED: Final[str] = "tool.permission.denied"
TOOL_PERMISSION_CHECKER_CREATED: Final[str] = "tool.permission.checker_created"
TOOL_PERMISSION_FILTERED: Final[str] = "tool.permission.filtered"

# ── File system tool events ──────────────────────────────────────
TOOL_FS_READ: Final[str] = "tool.fs.read"
TOOL_FS_WRITE: Final[str] = "tool.fs.write"
TOOL_FS_EDIT: Final[str] = "tool.fs.edit"
TOOL_FS_EDIT_NOT_FOUND: Final[str] = "tool.fs.edit_not_found"
TOOL_FS_LIST: Final[str] = "tool.fs.list"
TOOL_FS_DELETE: Final[str] = "tool.fs.delete"
TOOL_FS_PATH_VIOLATION: Final[str] = "tool.fs.path_violation"
TOOL_FS_BINARY_DETECTED: Final[str] = "tool.fs.binary_detected"
TOOL_FS_SIZE_EXCEEDED: Final[str] = "tool.fs.size_exceeded"
TOOL_FS_ERROR: Final[str] = "tool.fs.error"
TOOL_FS_STAT_FAILED: Final[str] = "tool.fs.stat_failed"
TOOL_FS_WORKSPACE_INVALID: Final[str] = "tool.fs.workspace_invalid"
TOOL_FS_PARENT_NOT_FOUND: Final[str] = "tool.fs.parent_not_found"
TOOL_FS_GLOB_REJECTED: Final[str] = "tool.fs.glob_rejected"
TOOL_FS_NOOP: Final[str] = "tool.fs.noop"
16 changes: 16 additions & 0 deletions src/ai_company/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,28 @@
ToolPermissionDeniedError,
)
from .examples.echo import EchoTool
from .file_system import (
BaseFileSystemTool,
DeleteFileTool,
EditFileTool,
ListDirectoryTool,
PathValidator,
ReadFileTool,
WriteFileTool,
)
from .invoker import ToolInvoker
from .permissions import ToolPermissionChecker
from .registry import ToolRegistry

__all__ = [
"BaseFileSystemTool",
"BaseTool",
"DeleteFileTool",
"EchoTool",
"EditFileTool",
"ListDirectoryTool",
"PathValidator",
"ReadFileTool",
"ToolError",
"ToolExecutionError",
"ToolExecutionResult",
Expand All @@ -25,4 +40,5 @@
"ToolPermissionChecker",
"ToolPermissionDeniedError",
"ToolRegistry",
"WriteFileTool",
]
23 changes: 23 additions & 0 deletions src/ai_company/tools/file_system/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Built-in file system tools for workspace interaction.

Provides tools for reading, writing, editing, listing, and deleting
files within a sandboxed workspace directory.
"""

from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool
from ai_company.tools.file_system._path_validator import PathValidator
from ai_company.tools.file_system.delete_file import DeleteFileTool
from ai_company.tools.file_system.edit_file import EditFileTool
from ai_company.tools.file_system.list_directory import ListDirectoryTool
from ai_company.tools.file_system.read_file import ReadFileTool
from ai_company.tools.file_system.write_file import WriteFileTool

__all__ = [
"BaseFileSystemTool",
"DeleteFileTool",
"EditFileTool",
"ListDirectoryTool",
"PathValidator",
"ReadFileTool",
"WriteFileTool",
]
80 changes: 80 additions & 0 deletions src/ai_company/tools/file_system/_base_fs_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""Base class for file system tools.

Provides the common ``ToolCategory.FILE_SYSTEM`` category and a
``PathValidator`` instance bound to the workspace root.
"""

from abc import ABC
from typing import TYPE_CHECKING, Any

from ai_company.core.enums import ToolCategory
from ai_company.observability import get_logger
from ai_company.tools.base import BaseTool
from ai_company.tools.file_system._path_validator import PathValidator

if TYPE_CHECKING:
from pathlib import Path

logger = get_logger(__name__)


Copilot AI Mar 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_base_fs_tool.py defines logger = get_logger(__name__) but never uses it. This will trip linting (unused import/variable). Remove the logger (and get_logger import) or use it for shared logging in this module.

Suggested change
from ai_company.observability import get_logger
from ai_company.tools.base import BaseTool
from ai_company.tools.file_system._path_validator import PathValidator
if TYPE_CHECKING:
from pathlib import Path
logger = get_logger(__name__)
from ai_company.tools.base import BaseTool
from ai_company.tools.file_system._path_validator import PathValidator
if TYPE_CHECKING:
from pathlib import Path

Copilot uses AI. Check for mistakes.
def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]:
"""Map an OS error to ``(log_key, user_message)`` for FS operations.

Args:
exc: The caught OS-level exception.
user_path: The original user-supplied path string.
verb: Action verb for the fallback message (e.g. ``"reading"``).

Returns:
A two-tuple of (structured log key, human-readable message).
"""
if isinstance(exc, FileNotFoundError):
return "not_found", f"File not found: {user_path}"
if isinstance(exc, IsADirectoryError):
return "is_directory", f"Path is a directory, not a file: {user_path}"
if isinstance(exc, PermissionError):
return "permission_denied", f"Permission denied: {user_path}"
return str(exc), f"OS error {verb} file: {user_path}"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The fallback case in _map_os_error uses str(exc) as the log_key. This is not ideal for structured logging, as exception messages can be variable and are not suitable for aggregation. Using the exception's class name would provide a more consistent key for log analysis. Additionally, the user-facing message could include the exception details to be more informative.

Suggested change
return str(exc), f"OS error {verb} file: {user_path}"
return exc.__class__.__name__, f"OS error {verb} file '{user_path}': {exc}"



class BaseFileSystemTool(BaseTool, ABC):
"""Abstract base for all file system tools.

Sets ``category=ToolCategory.FILE_SYSTEM`` and holds a shared
``PathValidator`` for workspace-scoped path resolution.
"""

def __init__(
self,
*,
workspace_root: Path,
name: str,
description: str = "",
parameters_schema: dict[str, Any] | None = None,
) -> None:
"""Initialize with a workspace root and tool metadata.

Args:
workspace_root: Root directory bounding file access.
name: Tool name.
description: Human-readable description.
parameters_schema: JSON Schema for tool parameters.
"""
super().__init__(
name=name,
description=description,
category=ToolCategory.FILE_SYSTEM,
parameters_schema=parameters_schema,
)
self._path_validator = PathValidator(workspace_root)

@property
def workspace_root(self) -> Path:
"""The resolved workspace root directory."""
return self._path_validator.workspace_root

@property
def path_validator(self) -> PathValidator:
"""The path validator instance."""
return self._path_validator
109 changes: 109 additions & 0 deletions src/ai_company/tools/file_system/_path_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Workspace path validation for file system tools.

All file system tools delegate path validation to ``PathValidator``,
which ensures that resolved paths remain within the configured
workspace root. This prevents path-traversal attacks via ``../``,
symlinks, or absolute paths outside the workspace.
"""

from typing import TYPE_CHECKING

from ai_company.observability import get_logger
from ai_company.observability.events.tool import (
TOOL_FS_PARENT_NOT_FOUND,
TOOL_FS_PATH_VIOLATION,
TOOL_FS_WORKSPACE_INVALID,
)

if TYPE_CHECKING:
from pathlib import Path

logger = get_logger(__name__)


class PathValidator:
"""Validates and resolves paths against a workspace root.

All resolved paths must remain within ``workspace_root``.
Symlinks are resolved before checking, so a symlink pointing
outside the workspace is rejected.

Attributes:
workspace_root: The resolved workspace root directory.
"""

def __init__(self, workspace_root: Path) -> None:
"""Initialize with a workspace root directory.

Args:
workspace_root: Root directory that bounds all file access.

Raises:
ValueError: If workspace_root does not exist or is not a
directory.
"""
resolved = workspace_root.resolve()
if not resolved.is_dir():
logger.warning(
TOOL_FS_WORKSPACE_INVALID,
workspace_root=str(workspace_root),
)
msg = f"Workspace root is not an existing directory: {workspace_root}"
raise ValueError(msg)
self._workspace_root = resolved

@property
def workspace_root(self) -> Path:
"""The resolved workspace root directory."""
return self._workspace_root

def validate(self, path: str) -> Path:
"""Resolve *path* against workspace root and validate containment.

Args:
path: A relative or absolute path string from the user.

Returns:
The resolved ``Path`` guaranteed to be within the workspace.

Raises:
ValueError: If the resolved path escapes the workspace.
"""
# NOTE: There is an inherent TOCTOU gap between this validation
# and the actual file operation (which runs in asyncio.to_thread).
# A concurrent process could swap in a symlink between validation
# and use. Full mitigation requires OS-level sandboxing (e.g.
# openat2 RESOLVE_BENEATH on Linux). User-space path validation
# is a best-effort defence-in-depth layer.
resolved = (self._workspace_root / path).resolve()
if not resolved.is_relative_to(self._workspace_root):
logger.warning(
TOOL_FS_PATH_VIOLATION,
user_path=path,
)
msg = f"Path escapes workspace: {path}"
raise ValueError(msg)
return resolved
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def validate_parent_exists(self, path: str) -> Path:
"""Like ``validate`` but also checks that the parent directory exists.

Args:
path: A relative or absolute path string from the user.

Returns:
The resolved ``Path`` within the workspace whose parent exists.

Raises:
ValueError: If the resolved path escapes the workspace or
the parent directory does not exist.
"""
resolved = self.validate(path)
if not resolved.parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
Comment on lines +115 to +125

Copilot AI Mar 7, 2026

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate_parent_exists() only checks resolved.parent.exists(). If the parent path exists but is a file (e.g., writing to hello.txt/new.txt), validation passes and the write later fails with a generic OS error. Consider checking resolved.parent.is_dir() and returning a clearer error message (and event) for “parent is not a directory”.

Suggested change
ValueError: If the resolved path escapes the workspace or
the parent directory does not exist.
"""
resolved = self.validate(path)
if not resolved.parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
ValueError: If the resolved path escapes the workspace, the
parent directory does not exist, or the parent is not a
directory.
"""
resolved = self.validate(path)
parent = resolved.parent
if not parent.exists():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent directory does not exist: {path}"
raise ValueError(msg)
if not parent.is_dir():
logger.warning(
TOOL_FS_PARENT_NOT_FOUND,
path=path,
)
msg = f"Parent path is not a directory: {path}"
raise ValueError(msg)

Copilot uses AI. Check for mistakes.
return resolved
Loading