From 4098a0071463feb97d6b18da341804315e5306c2 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 08:56:19 +0100 Subject: [PATCH 1/3] feat: implement built-in file system tools (#18) Add 5 file system tools (read_file, write_file, edit_file, list_directory, delete_file) as a tools/file_system/ subpackage. - PathValidator enforces workspace-scoped path safety (symlink-aware) - BaseFileSystemTool sets FILE_SYSTEM category + shared validator - All tools use asyncio.to_thread for non-blocking file I/O - ReadFileTool: line-range selection, 1MB size guard, binary detection - WriteFileTool: create/overwrite with optional parent dir creation - EditFileTool: first-occurrence search-and-replace with LLM context - ListDirectoryTool: glob filtering, recursive listing, 1000-entry cap - DeleteFileTool: files only, require_elevated=True for permission gating - 10 structured observability event constants (TOOL_FS_*) - 59 unit tests covering happy paths, error handling, and security Closes #18 Co-Authored-By: Claude Opus 4.6 --- src/ai_company/observability/events/tool.py | 12 ++ src/ai_company/tools/__init__.py | 16 ++ src/ai_company/tools/file_system/__init__.py | 23 +++ .../tools/file_system/_base_fs_tool.py | 57 ++++++ .../tools/file_system/_path_validator.py | 91 +++++++++ .../tools/file_system/delete_file.py | 127 ++++++++++++ src/ai_company/tools/file_system/edit_file.py | 193 ++++++++++++++++++ .../tools/file_system/list_directory.py | 189 +++++++++++++++++ src/ai_company/tools/file_system/read_file.py | 180 ++++++++++++++++ .../tools/file_system/write_file.py | 147 +++++++++++++ tests/unit/tools/file_system/__init__.py | 0 tests/unit/tools/file_system/conftest.py | 55 +++++ .../tools/file_system/test_delete_file.py | 65 ++++++ .../unit/tools/file_system/test_edit_file.py | 135 ++++++++++++ .../tools/file_system/test_list_directory.py | 91 +++++++++ .../tools/file_system/test_path_validator.py | 83 ++++++++ .../unit/tools/file_system/test_read_file.py | 116 +++++++++++ .../unit/tools/file_system/test_write_file.py | 85 ++++++++ 18 files changed, 1665 insertions(+) create mode 100644 src/ai_company/tools/file_system/__init__.py create mode 100644 src/ai_company/tools/file_system/_base_fs_tool.py create mode 100644 src/ai_company/tools/file_system/_path_validator.py create mode 100644 src/ai_company/tools/file_system/delete_file.py create mode 100644 src/ai_company/tools/file_system/edit_file.py create mode 100644 src/ai_company/tools/file_system/list_directory.py create mode 100644 src/ai_company/tools/file_system/read_file.py create mode 100644 src/ai_company/tools/file_system/write_file.py create mode 100644 tests/unit/tools/file_system/__init__.py create mode 100644 tests/unit/tools/file_system/conftest.py create mode 100644 tests/unit/tools/file_system/test_delete_file.py create mode 100644 tests/unit/tools/file_system/test_edit_file.py create mode 100644 tests/unit/tools/file_system/test_list_directory.py create mode 100644 tests/unit/tools/file_system/test_path_validator.py create mode 100644 tests/unit/tools/file_system/test_read_file.py create mode 100644 tests/unit/tools/file_system/test_write_file.py diff --git a/src/ai_company/observability/events/tool.py b/src/ai_company/observability/events/tool.py index a90c8cb292..69fec52f1f 100644 --- a/src/ai_company/observability/events/tool.py +++ b/src/ai_company/observability/events/tool.py @@ -22,3 +22,15 @@ TOOL_PERMISSION_DENIED: Final[str] = "tool.permission.denied" TOOL_PERMISSION_CHECKER_CREATED: Final[str] = "tool.permission.checker_created" TOOL_PERMISSION_FILTERED: Final[str] = "tool.permission.filtered" + +# ── File system tool events ────────────────────────────────────── +TOOL_FS_READ: Final[str] = "tool.fs.read" +TOOL_FS_WRITE: Final[str] = "tool.fs.write" +TOOL_FS_EDIT: Final[str] = "tool.fs.edit" +TOOL_FS_EDIT_NOT_FOUND: Final[str] = "tool.fs.edit_not_found" +TOOL_FS_LIST: Final[str] = "tool.fs.list" +TOOL_FS_DELETE: Final[str] = "tool.fs.delete" +TOOL_FS_PATH_VIOLATION: Final[str] = "tool.fs.path_violation" +TOOL_FS_BINARY_DETECTED: Final[str] = "tool.fs.binary_detected" +TOOL_FS_SIZE_EXCEEDED: Final[str] = "tool.fs.size_exceeded" +TOOL_FS_ERROR: Final[str] = "tool.fs.error" diff --git a/src/ai_company/tools/__init__.py b/src/ai_company/tools/__init__.py index 680bdcc561..d36d969a3e 100644 --- a/src/ai_company/tools/__init__.py +++ b/src/ai_company/tools/__init__.py @@ -9,13 +9,28 @@ ToolPermissionDeniedError, ) from .examples.echo import EchoTool +from .file_system import ( + BaseFileSystemTool, + DeleteFileTool, + EditFileTool, + ListDirectoryTool, + PathValidator, + ReadFileTool, + WriteFileTool, +) from .invoker import ToolInvoker from .permissions import ToolPermissionChecker from .registry import ToolRegistry __all__ = [ + "BaseFileSystemTool", "BaseTool", + "DeleteFileTool", "EchoTool", + "EditFileTool", + "ListDirectoryTool", + "PathValidator", + "ReadFileTool", "ToolError", "ToolExecutionError", "ToolExecutionResult", @@ -25,4 +40,5 @@ "ToolPermissionChecker", "ToolPermissionDeniedError", "ToolRegistry", + "WriteFileTool", ] diff --git a/src/ai_company/tools/file_system/__init__.py b/src/ai_company/tools/file_system/__init__.py new file mode 100644 index 0000000000..e3629a3412 --- /dev/null +++ b/src/ai_company/tools/file_system/__init__.py @@ -0,0 +1,23 @@ +"""Built-in file system tools for workspace interaction. + +Provides tools for reading, writing, editing, listing, and deleting +files within a sandboxed workspace directory. +""" + +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool +from ai_company.tools.file_system._path_validator import PathValidator +from ai_company.tools.file_system.delete_file import DeleteFileTool +from ai_company.tools.file_system.edit_file import EditFileTool +from ai_company.tools.file_system.list_directory import ListDirectoryTool +from ai_company.tools.file_system.read_file import ReadFileTool +from ai_company.tools.file_system.write_file import WriteFileTool + +__all__ = [ + "BaseFileSystemTool", + "DeleteFileTool", + "EditFileTool", + "ListDirectoryTool", + "PathValidator", + "ReadFileTool", + "WriteFileTool", +] diff --git a/src/ai_company/tools/file_system/_base_fs_tool.py b/src/ai_company/tools/file_system/_base_fs_tool.py new file mode 100644 index 0000000000..7b3d4fe163 --- /dev/null +++ b/src/ai_company/tools/file_system/_base_fs_tool.py @@ -0,0 +1,57 @@ +"""Base class for file system tools. + +Provides the common ``ToolCategory.FILE_SYSTEM`` category and a +``PathValidator`` instance bound to the workspace root. +""" + +from abc import ABC +from typing import TYPE_CHECKING, Any + +from ai_company.core.enums import ToolCategory +from ai_company.tools.base import BaseTool +from ai_company.tools.file_system._path_validator import PathValidator + +if TYPE_CHECKING: + from pathlib import Path + + +class BaseFileSystemTool(BaseTool, ABC): + """Abstract base for all file system tools. + + Sets ``category=ToolCategory.FILE_SYSTEM`` and holds a shared + ``PathValidator`` for workspace-scoped path resolution. + """ + + def __init__( + self, + *, + workspace_root: Path, + name: str, + description: str = "", + parameters_schema: dict[str, Any] | None = None, + ) -> None: + """Initialize with a workspace root and tool metadata. + + Args: + workspace_root: Root directory bounding file access. + name: Tool name. + description: Human-readable description. + parameters_schema: JSON Schema for tool parameters. + """ + super().__init__( + name=name, + description=description, + category=ToolCategory.FILE_SYSTEM, + parameters_schema=parameters_schema, + ) + self._path_validator = PathValidator(workspace_root) + + @property + def workspace_root(self) -> Path: + """The resolved workspace root directory.""" + return self._path_validator.workspace_root + + @property + def path_validator(self) -> PathValidator: + """The path validator instance.""" + return self._path_validator diff --git a/src/ai_company/tools/file_system/_path_validator.py b/src/ai_company/tools/file_system/_path_validator.py new file mode 100644 index 0000000000..3de7d1228e --- /dev/null +++ b/src/ai_company/tools/file_system/_path_validator.py @@ -0,0 +1,91 @@ +"""Workspace path validation for file system tools. + +All file system tools delegate path validation to ``PathValidator``, +which ensures that resolved paths remain within the configured +workspace root. This prevents path-traversal attacks via ``../``, +symlinks, or absolute paths outside the workspace. +""" + +from typing import TYPE_CHECKING + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import TOOL_FS_PATH_VIOLATION + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + + +class PathValidator: + """Validates and resolves paths against a workspace root. + + All resolved paths must remain within ``workspace_root``. + Symlinks are resolved before checking, so a symlink pointing + outside the workspace is rejected. + + Attributes: + workspace_root: The resolved workspace root directory. + """ + + def __init__(self, workspace_root: Path) -> None: + """Initialize with a workspace root directory. + + Args: + workspace_root: Root directory that bounds all file access. + + Raises: + ValueError: If workspace_root does not exist or is not a + directory. + """ + resolved = workspace_root.resolve() + if not resolved.is_dir(): + msg = f"Workspace root is not an existing directory: {workspace_root}" + raise ValueError(msg) + self._workspace_root = resolved + + @property + def workspace_root(self) -> Path: + """The resolved workspace root directory.""" + return self._workspace_root + + def validate(self, path: str) -> Path: + """Resolve *path* against workspace root and validate containment. + + Args: + path: A relative or absolute path string from the user. + + Returns: + The resolved ``Path`` guaranteed to be within the workspace. + + Raises: + ValueError: If the resolved path escapes the workspace. + """ + resolved = (self._workspace_root / path).resolve() + if not resolved.is_relative_to(self._workspace_root): + logger.warning( + TOOL_FS_PATH_VIOLATION, + user_path=path, + ) + msg = f"Path escapes workspace: {path}" + raise ValueError(msg) + return resolved + + def validate_parent_exists(self, path: str) -> Path: + """Like ``validate`` but also checks that the parent directory exists. + + Args: + path: A relative or absolute path string from the user. + + Returns: + The resolved ``Path`` within the workspace whose parent exists. + + Raises: + ValueError: If the resolved path escapes the workspace or + the parent directory does not exist. + """ + resolved = self.validate(path) + if not resolved.parent.exists(): + msg = f"Parent directory does not exist: {path}" + raise ValueError(msg) + return resolved diff --git a/src/ai_company/tools/file_system/delete_file.py b/src/ai_company/tools/file_system/delete_file.py new file mode 100644 index 0000000000..231c91ae57 --- /dev/null +++ b/src/ai_company/tools/file_system/delete_file.py @@ -0,0 +1,127 @@ +"""Delete file tool — removes a single file from the workspace.""" + +import asyncio +from typing import TYPE_CHECKING, Any + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import TOOL_FS_DELETE, TOOL_FS_ERROR +from ai_company.tools.base import ToolExecutionResult +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + + +def _delete_sync(resolved: Path) -> int: + """Delete file synchronously, returning its size before deletion.""" + size = resolved.stat().st_size + resolved.unlink() + return size + + +class DeleteFileTool(BaseFileSystemTool): + """Deletes a single file within the workspace. + + Directories cannot be deleted with this tool — only regular files. + This tool has ``require_elevated = True``, signalling to the engine + that elevated permissions are needed. + + Examples: + Delete a file:: + + tool = DeleteFileTool(workspace_root=Path("/ws")) + result = await tool.execute(arguments={"path": "tmp.txt"}) + """ + + def __init__(self, *, workspace_root: Path) -> None: + """Initialize the delete-file tool. + + Args: + workspace_root: Root directory bounding file access. + """ + super().__init__( + workspace_root=workspace_root, + name="delete_file", + description="Delete a single file from the workspace.", + parameters_schema={ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to workspace", + }, + }, + "required": ["path"], + "additionalProperties": False, + }, + ) + + @property + def require_elevated(self) -> bool: + """Whether this tool requires elevated permissions.""" + return True + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Delete a file from the workspace. + + Args: + arguments: Must contain ``path``. + + Returns: + A ``ToolExecutionResult`` confirming deletion or an error. + """ + user_path: str = arguments["path"] + + try: + resolved = self.path_validator.validate(user_path) + except ValueError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + if not resolved.exists(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="not_found") + return ToolExecutionResult( + content=f"File not found: {user_path}", + is_error=True, + ) + + if resolved.is_dir(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") + return ToolExecutionResult( + content=f"Cannot delete directory (use a dedicated tool): {user_path}", + is_error=True, + ) + + try: + size_bytes = await asyncio.to_thread(_delete_sync, resolved) + except PermissionError: + logger.warning(TOOL_FS_ERROR, path=user_path, error="permission_denied") + return ToolExecutionResult( + content=f"Permission denied: {user_path}", + is_error=True, + ) + except OSError as exc: + logger.warning(TOOL_FS_ERROR, path=user_path, error=str(exc)) + return ToolExecutionResult( + content=f"OS error deleting file: {user_path}", + is_error=True, + ) + + logger.info( + TOOL_FS_DELETE, + path=user_path, + size_bytes=size_bytes, + ) + + return ToolExecutionResult( + content=f"Deleted {user_path} ({size_bytes} bytes)", + metadata={ + "path": user_path, + "size_bytes": size_bytes, + }, + ) diff --git a/src/ai_company/tools/file_system/edit_file.py b/src/ai_company/tools/file_system/edit_file.py new file mode 100644 index 0000000000..8ccf5b7d5b --- /dev/null +++ b/src/ai_company/tools/file_system/edit_file.py @@ -0,0 +1,193 @@ +"""Edit file tool — search-and-replace within workspace files.""" + +import asyncio +from typing import TYPE_CHECKING, Any + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import ( + TOOL_FS_EDIT, + TOOL_FS_EDIT_NOT_FOUND, + TOOL_FS_ERROR, +) +from ai_company.tools.base import ToolExecutionResult +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + + +def _edit_sync(resolved: Path, old_text: str, new_text: str) -> tuple[str, int]: + """Perform search-and-replace synchronously. + + Returns: + Tuple of (new_content, occurrences_found). + """ + content = resolved.read_text(encoding="utf-8") + count = content.count(old_text) + if count > 0: + new_content = content.replace(old_text, new_text, 1) + resolved.write_text(new_content, encoding="utf-8") + return content, count + + +class EditFileTool(BaseFileSystemTool): + """Replaces the first occurrence of ``old_text`` with ``new_text``. + + If ``old_text`` is not found, returns an error with a snippet of the + file content to help the LLM adjust its search string. When + multiple occurrences exist, only the first is replaced and a warning + is included in the output. + + Examples: + Replace text:: + + tool = EditFileTool(workspace_root=Path("/ws")) + result = await tool.execute( + arguments={ + "path": "main.py", + "old_text": "foo", + "new_text": "bar", + } + ) + """ + + def __init__(self, *, workspace_root: Path) -> None: + """Initialize the edit-file tool. + + Args: + workspace_root: Root directory bounding file access. + """ + super().__init__( + workspace_root=workspace_root, + name="edit_file", + description=( + "Replace the first occurrence of old_text with new_text " + "in a file. Use empty new_text to delete text." + ), + parameters_schema={ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to workspace", + }, + "old_text": { + "type": "string", + "description": "Exact text to find", + }, + "new_text": { + "type": "string", + "description": ("Replacement text (empty string to delete)"), + }, + }, + "required": ["path", "old_text", "new_text"], + "additionalProperties": False, + }, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Edit a file by replacing text. + + Args: + arguments: Must contain ``path``, ``old_text``, and + ``new_text``. + + Returns: + A ``ToolExecutionResult`` confirming the edit or an error. + """ + user_path: str = arguments["path"] + old_text: str = arguments["old_text"] + new_text: str = arguments["new_text"] + + if old_text == new_text: + return ToolExecutionResult( + content=f"No change needed in {user_path}: " + "old_text and new_text are identical", + metadata={ + "path": user_path, + "occurrences_found": 0, + "occurrences_replaced": 0, + }, + ) + + try: + resolved = self.path_validator.validate(user_path) + except ValueError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + try: + content, count = await asyncio.to_thread( + _edit_sync, resolved, old_text, new_text + ) + except UnicodeDecodeError: + logger.warning(TOOL_FS_ERROR, path=user_path, error="binary") + return ToolExecutionResult( + content=f"Cannot edit binary file: {user_path}", + is_error=True, + ) + except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: + _err_msgs: dict[type, tuple[str, str]] = { + FileNotFoundError: ( + "not_found", + f"File not found: {user_path}", + ), + IsADirectoryError: ( + "is_directory", + f"Path is a directory, not a file: {user_path}", + ), + PermissionError: ( + "permission_denied", + f"Permission denied: {user_path}", + ), + } + log_key, msg = _err_msgs.get( + type(exc), + (str(exc), f"OS error editing file: {user_path}"), + ) + logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) + return ToolExecutionResult(content=msg, is_error=True) + + if count == 0: + snippet = content[:500] + logger.info( + TOOL_FS_EDIT_NOT_FOUND, + path=user_path, + old_text_preview=old_text[:100], + ) + return ToolExecutionResult( + content=( + f"Text not found in {user_path}. File content preview:\n{snippet}" + ), + is_error=True, + metadata={ + "path": user_path, + "occurrences_found": 0, + "occurrences_replaced": 0, + }, + ) + + msg = f"Replaced 1 occurrence in {user_path}" + if count > 1: + msg += f" (warning: {count} total occurrences found, only first replaced)" + + logger.info( + TOOL_FS_EDIT, + path=user_path, + occurrences_found=count, + occurrences_replaced=1, + ) + + return ToolExecutionResult( + content=msg, + metadata={ + "path": user_path, + "occurrences_found": count, + "occurrences_replaced": 1, + }, + ) diff --git a/src/ai_company/tools/file_system/list_directory.py b/src/ai_company/tools/file_system/list_directory.py new file mode 100644 index 0000000000..01e1ea0590 --- /dev/null +++ b/src/ai_company/tools/file_system/list_directory.py @@ -0,0 +1,189 @@ +"""List directory tool — lists entries in a workspace directory.""" + +import asyncio +from typing import TYPE_CHECKING, Any, Final + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import TOOL_FS_ERROR, TOOL_FS_LIST +from ai_company.tools.base import ToolExecutionResult +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + +MAX_ENTRIES: Final[int] = 1000 + + +def _list_sync( + resolved: Path, + workspace_root: Path, + pattern: str | None, + *, + recursive: bool, +) -> list[str]: + """Collect directory entries synchronously.""" + if recursive: + glob_pattern = pattern or "*" + entries = sorted(resolved.rglob(glob_pattern)) + elif pattern: + entries = sorted(resolved.glob(pattern)) + else: + entries = sorted(resolved.iterdir()) + + lines: list[str] = [] + for entry in entries: + try: + if entry.is_symlink(): + target = entry.resolve() + if not target.is_relative_to(workspace_root): + lines.append(f"[SYMLINK] {entry.name} -> (outside workspace)") + continue + + if entry.is_dir(): + lines.append(f"[DIR] {entry.name}/") + else: + try: + size = entry.stat().st_size + except OSError: + size = 0 + lines.append(f"[FILE] {entry.name} ({size} bytes)") + except OSError: + lines.append(f"[ERROR] {entry.name}") + + return lines + + +class ListDirectoryTool(BaseFileSystemTool): + """Lists files and directories within the workspace. + + Supports optional glob filtering and recursive listing. Output is + sorted alphabetically with type prefixes (``[DIR]`` / ``[FILE]``). + Results are capped at 1000 entries to prevent excessive output. + + Examples: + List current directory:: + + tool = ListDirectoryTool(workspace_root=Path("/ws")) + result = await tool.execute(arguments={}) + """ + + def __init__(self, *, workspace_root: Path) -> None: + """Initialize the list-directory tool. + + Args: + workspace_root: Root directory bounding file access. + """ + super().__init__( + workspace_root=workspace_root, + name="list_directory", + description=( + "List files and directories. Supports glob filtering " + "and recursive listing." + ), + parameters_schema={ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": ( + 'Directory path relative to workspace (default ".")' + ), + "default": ".", + }, + "pattern": { + "type": "string", + "description": 'Glob filter (e.g. "*.py")', + }, + "recursive": { + "type": "boolean", + "description": "Recursive listing (default false)", + "default": False, + }, + }, + "additionalProperties": False, + }, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """List directory contents. + + Args: + arguments: Optionally contains ``path``, ``pattern``, and + ``recursive``. + + Returns: + A ``ToolExecutionResult`` with the listing or an error. + """ + user_path: str = arguments.get("path", ".") + pattern: str | None = arguments.get("pattern") + recursive: bool = arguments.get("recursive", False) + + try: + resolved = self.path_validator.validate(user_path) + except ValueError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + if not resolved.is_dir(): + return ToolExecutionResult( + content=f"Not a directory: {user_path}", + is_error=True, + ) + + try: + lines = await asyncio.to_thread( + _list_sync, + resolved, + self.workspace_root, + pattern, + recursive=recursive, + ) + except PermissionError: + logger.warning(TOOL_FS_ERROR, path=user_path, error="permission_denied") + return ToolExecutionResult( + content=f"Permission denied: {user_path}", + is_error=True, + ) + except OSError as exc: + logger.warning(TOOL_FS_ERROR, path=user_path, error=str(exc)) + return ToolExecutionResult( + content=f"OS error listing directory: {user_path}", + is_error=True, + ) + + truncated = len(lines) > MAX_ENTRIES + if truncated: + lines = lines[:MAX_ENTRIES] + + dir_count = sum(1 for ln in lines if ln.startswith("[DIR]")) + file_count = sum(1 for ln in lines if ln.startswith("[FILE]")) + + output = "\n".join(lines) + if truncated: + output += f"\n\n[Truncated: showing first {MAX_ENTRIES} of more entries]" + + if not lines: + output = f"Directory is empty: {user_path}" + + logger.info( + TOOL_FS_LIST, + path=user_path, + total_entries=len(lines), + directories=dir_count, + files=file_count, + ) + + return ToolExecutionResult( + content=output, + metadata={ + "path": user_path, + "total_entries": len(lines), + "directories": dir_count, + "files": file_count, + }, + ) diff --git a/src/ai_company/tools/file_system/read_file.py b/src/ai_company/tools/file_system/read_file.py new file mode 100644 index 0000000000..18fe530c9c --- /dev/null +++ b/src/ai_company/tools/file_system/read_file.py @@ -0,0 +1,180 @@ +"""Read file tool — reads file content from the workspace. + +Supports optional line-range selection and enforces a maximum +file-size guard to prevent loading excessively large files. +""" + +import asyncio +from typing import TYPE_CHECKING, Any, Final + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import ( + TOOL_FS_BINARY_DETECTED, + TOOL_FS_ERROR, + TOOL_FS_READ, + TOOL_FS_SIZE_EXCEEDED, +) +from ai_company.tools.base import ToolExecutionResult +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + +MAX_FILE_SIZE_BYTES: Final[int] = 1_048_576 # 1 MB + + +def _read_sync(resolved: Path, start: int | None, end: int | None) -> str: + """Read file content synchronously, with optional line slicing.""" + raw = resolved.read_text(encoding="utf-8") + if start is not None or end is not None: + lines = raw.splitlines(keepends=True) + s = (start - 1) if start is not None else 0 + e = end if end is not None else len(lines) + raw = "".join(lines[s:e]) + return raw + + +class ReadFileTool(BaseFileSystemTool): + """Reads the content of a file within the workspace. + + Supports optional ``start_line`` / ``end_line`` for partial reads. + Files exceeding 1 MB are truncated with a warning. Binary files + (non-UTF-8) produce an error result. + + Examples: + Read an entire file:: + + tool = ReadFileTool(workspace_root=Path("/ws")) + result = await tool.execute(arguments={"path": "src/main.py"}) + """ + + def __init__(self, *, workspace_root: Path) -> None: + """Initialize the read-file tool. + + Args: + workspace_root: Root directory bounding file access. + """ + super().__init__( + workspace_root=workspace_root, + name="read_file", + description=( + "Read the contents of a file. Supports optional " + "line-range selection via start_line and end_line." + ), + parameters_schema={ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to workspace", + }, + "start_line": { + "type": "integer", + "minimum": 1, + "description": "First line to read (1-based inclusive)", + }, + "end_line": { + "type": "integer", + "minimum": 1, + "description": "Last line to read (1-based inclusive)", + }, + }, + "required": ["path"], + "additionalProperties": False, + }, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Read a file and return its content. + + Args: + arguments: Must contain ``path``; optionally ``start_line`` + and ``end_line``. + + Returns: + A ``ToolExecutionResult`` with the file content or an error. + """ + user_path: str = arguments["path"] + start_line: int | None = arguments.get("start_line") + end_line: int | None = arguments.get("end_line") + + try: + resolved = self.path_validator.validate(user_path) + except ValueError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + try: + size = await asyncio.to_thread(resolved.stat) + size_bytes = size.st_size + + if size_bytes > MAX_FILE_SIZE_BYTES: + logger.warning( + TOOL_FS_SIZE_EXCEEDED, + path=user_path, + size_bytes=size_bytes, + max_bytes=MAX_FILE_SIZE_BYTES, + ) + + content = await asyncio.to_thread( + _read_sync, resolved, start_line, end_line + ) + + if size_bytes > MAX_FILE_SIZE_BYTES and not (start_line or end_line): + content = content[:MAX_FILE_SIZE_BYTES] + content += ( + f"\n\n[Truncated: file is {size_bytes:,} bytes, " + f"showing first {MAX_FILE_SIZE_BYTES:,}]" + ) + + line_count = content.count("\n") + ( + 1 if content and not content.endswith("\n") else 0 + ) + + logger.info( + TOOL_FS_READ, + path=user_path, + size_bytes=size_bytes, + line_count=line_count, + ) + + return ToolExecutionResult( + content=content, + metadata={ + "path": user_path, + "size_bytes": size_bytes, + "line_count": line_count, + }, + ) + except UnicodeDecodeError: + logger.warning(TOOL_FS_BINARY_DETECTED, path=user_path) + return ToolExecutionResult( + content=f"Cannot read binary file: {user_path}", + is_error=True, + ) + except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: + _err_msgs: dict[type, tuple[str, str]] = { + FileNotFoundError: ( + "not_found", + f"File not found: {user_path}", + ), + IsADirectoryError: ( + "is_directory", + f"Path is a directory, not a file: {user_path}", + ), + PermissionError: ( + "permission_denied", + f"Permission denied: {user_path}", + ), + } + log_key, msg = _err_msgs.get( + type(exc), + (str(exc), f"OS error reading file: {user_path}"), + ) + logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) + return ToolExecutionResult(content=msg, is_error=True) diff --git a/src/ai_company/tools/file_system/write_file.py b/src/ai_company/tools/file_system/write_file.py new file mode 100644 index 0000000000..76d372a20a --- /dev/null +++ b/src/ai_company/tools/file_system/write_file.py @@ -0,0 +1,147 @@ +"""Write file tool — creates or overwrites files in the workspace.""" + +import asyncio +from typing import TYPE_CHECKING, Any + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import TOOL_FS_ERROR, TOOL_FS_WRITE +from ai_company.tools.base import ToolExecutionResult +from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool + +if TYPE_CHECKING: + from pathlib import Path + +logger = get_logger(__name__) + + +def _write_sync(resolved: Path, content: str, *, create_dirs: bool) -> tuple[int, bool]: + """Write content to file synchronously. + + Returns: + Tuple of (bytes_written, created) where *created* is True if + the file did not exist before the write. + """ + created = not resolved.exists() + if create_dirs: + resolved.parent.mkdir(parents=True, exist_ok=True) + resolved.write_text(content, encoding="utf-8") + return resolved.stat().st_size, created + + +class WriteFileTool(BaseFileSystemTool): + """Creates or overwrites a file within the workspace. + + Optionally creates parent directories when ``create_directories`` + is True. + + Examples: + Write a new file:: + + tool = WriteFileTool(workspace_root=Path("/ws")) + result = await tool.execute( + arguments={"path": "out.txt", "content": "hello"} + ) + """ + + def __init__(self, *, workspace_root: Path) -> None: + """Initialize the write-file tool. + + Args: + workspace_root: Root directory bounding file access. + """ + super().__init__( + workspace_root=workspace_root, + name="write_file", + description=( + "Write content to a file, creating or overwriting it. " + "Set create_directories to true to create parent dirs." + ), + parameters_schema={ + "type": "object", + "properties": { + "path": { + "type": "string", + "description": "File path relative to workspace", + }, + "content": { + "type": "string", + "description": "Content to write", + }, + "create_directories": { + "type": "boolean", + "description": ( + "Create parent directories if missing (default false)" + ), + "default": False, + }, + }, + "required": ["path", "content"], + "additionalProperties": False, + }, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + """Write content to a file. + + Args: + arguments: Must contain ``path`` and ``content``; optionally + ``create_directories``. + + Returns: + A ``ToolExecutionResult`` confirming the write or an error. + """ + user_path: str = arguments["path"] + content: str = arguments["content"] + create_dirs: bool = arguments.get("create_directories", False) + + try: + if create_dirs: + resolved = self.path_validator.validate(user_path) + else: + resolved = self.path_validator.validate_parent_exists(user_path) + except ValueError as exc: + return ToolExecutionResult(content=str(exc), is_error=True) + + try: + bytes_written, created = await asyncio.to_thread( + _write_sync, resolved, content, create_dirs=create_dirs + ) + + action = "Created" if created else "Updated" + logger.info( + TOOL_FS_WRITE, + path=user_path, + bytes_written=bytes_written, + created=created, + ) + + return ToolExecutionResult( + content=f"{action} {user_path} ({bytes_written} bytes)", + metadata={ + "path": user_path, + "bytes_written": bytes_written, + "created": created, + }, + ) + except IsADirectoryError: + logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") + return ToolExecutionResult( + content=f"Path is a directory, not a file: {user_path}", + is_error=True, + ) + except PermissionError: + logger.warning(TOOL_FS_ERROR, path=user_path, error="permission_denied") + return ToolExecutionResult( + content=f"Permission denied: {user_path}", + is_error=True, + ) + except OSError as exc: + logger.warning(TOOL_FS_ERROR, path=user_path, error=str(exc)) + return ToolExecutionResult( + content=f"OS error writing file: {user_path}", + is_error=True, + ) diff --git a/tests/unit/tools/file_system/__init__.py b/tests/unit/tools/file_system/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/unit/tools/file_system/conftest.py b/tests/unit/tools/file_system/conftest.py new file mode 100644 index 0000000000..882917733e --- /dev/null +++ b/tests/unit/tools/file_system/conftest.py @@ -0,0 +1,55 @@ +"""Shared fixtures for file system tool tests.""" + +from typing import TYPE_CHECKING + +import pytest + +from ai_company.tools.file_system.delete_file import DeleteFileTool +from ai_company.tools.file_system.edit_file import EditFileTool +from ai_company.tools.file_system.list_directory import ListDirectoryTool +from ai_company.tools.file_system.read_file import ReadFileTool +from ai_company.tools.file_system.write_file import WriteFileTool + +if TYPE_CHECKING: + from pathlib import Path + + +@pytest.fixture +def workspace(tmp_path: Path) -> Path: + """Create a temporary workspace directory with sample files.""" + (tmp_path / "hello.txt").write_text("Hello, world!\n", encoding="utf-8") + (tmp_path / "empty.txt").write_text("", encoding="utf-8") + sub = tmp_path / "subdir" + sub.mkdir() + (sub / "nested.py").write_text("print('nested')\n", encoding="utf-8") + return tmp_path + + +@pytest.fixture +def read_tool(workspace: Path) -> ReadFileTool: + """ReadFileTool bound to the test workspace.""" + return ReadFileTool(workspace_root=workspace) + + +@pytest.fixture +def write_tool(workspace: Path) -> WriteFileTool: + """WriteFileTool bound to the test workspace.""" + return WriteFileTool(workspace_root=workspace) + + +@pytest.fixture +def edit_tool(workspace: Path) -> EditFileTool: + """EditFileTool bound to the test workspace.""" + return EditFileTool(workspace_root=workspace) + + +@pytest.fixture +def list_tool(workspace: Path) -> ListDirectoryTool: + """ListDirectoryTool bound to the test workspace.""" + return ListDirectoryTool(workspace_root=workspace) + + +@pytest.fixture +def delete_tool(workspace: Path) -> DeleteFileTool: + """DeleteFileTool bound to the test workspace.""" + return DeleteFileTool(workspace_root=workspace) diff --git a/tests/unit/tools/file_system/test_delete_file.py b/tests/unit/tools/file_system/test_delete_file.py new file mode 100644 index 0000000000..7756b26caa --- /dev/null +++ b/tests/unit/tools/file_system/test_delete_file.py @@ -0,0 +1,65 @@ +"""Tests for DeleteFileTool.""" + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + from ai_company.tools.file_system.delete_file import DeleteFileTool + + +@pytest.mark.unit +class TestDeleteFileProperties: + """Tool metadata tests.""" + + def test_require_elevated(self, delete_tool: DeleteFileTool) -> None: + assert delete_tool.require_elevated is True + + +@pytest.mark.unit +class TestDeleteFileExecution: + """Execution tests.""" + + async def test_delete_existing_file( + self, workspace: Path, delete_tool: DeleteFileTool + ) -> None: + assert (workspace / "hello.txt").exists() + result = await delete_tool.execute(arguments={"path": "hello.txt"}) + assert not result.is_error + assert "Deleted" in result.content + assert not (workspace / "hello.txt").exists() + assert result.metadata["path"] == "hello.txt" + assert result.metadata["size_bytes"] > 0 + + async def test_delete_nonexistent_file(self, delete_tool: DeleteFileTool) -> None: + result = await delete_tool.execute(arguments={"path": "does_not_exist.txt"}) + assert result.is_error + assert "not found" in result.content.lower() + + async def test_delete_directory_rejected(self, delete_tool: DeleteFileTool) -> None: + result = await delete_tool.execute(arguments={"path": "subdir"}) + assert result.is_error + assert "Cannot delete directory" in result.content + + async def test_path_traversal_blocked(self, delete_tool: DeleteFileTool) -> None: + result = await delete_tool.execute(arguments={"path": "../../../etc/passwd"}) + assert result.is_error + assert "escapes workspace" in result.content + + async def test_delete_nested_file( + self, workspace: Path, delete_tool: DeleteFileTool + ) -> None: + assert (workspace / "subdir" / "nested.py").exists() + result = await delete_tool.execute(arguments={"path": "subdir/nested.py"}) + assert not result.is_error + assert not (workspace / "subdir" / "nested.py").exists() + + async def test_delete_empty_file( + self, workspace: Path, delete_tool: DeleteFileTool + ) -> None: + result = await delete_tool.execute(arguments={"path": "empty.txt"}) + assert not result.is_error + assert not (workspace / "empty.txt").exists() + assert result.metadata["size_bytes"] == 0 diff --git a/tests/unit/tools/file_system/test_edit_file.py b/tests/unit/tools/file_system/test_edit_file.py new file mode 100644 index 0000000000..623ee8a0ed --- /dev/null +++ b/tests/unit/tools/file_system/test_edit_file.py @@ -0,0 +1,135 @@ +"""Tests for EditFileTool.""" + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + from ai_company.tools.file_system.edit_file import EditFileTool + + +@pytest.mark.unit +class TestEditFileExecution: + """Execution tests.""" + + async def test_replace_text(self, workspace: Path, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "hello.txt", + "old_text": "world", + "new_text": "universe", + } + ) + assert not result.is_error + assert "Replaced 1 occurrence" in result.content + content = (workspace / "hello.txt").read_text(encoding="utf-8") + assert "universe" in content + assert "world" not in content + + async def test_delete_text_with_empty_new( + self, workspace: Path, edit_tool: EditFileTool + ) -> None: + result = await edit_tool.execute( + arguments={ + "path": "hello.txt", + "old_text": ", world", + "new_text": "", + } + ) + assert not result.is_error + content = (workspace / "hello.txt").read_text(encoding="utf-8") + assert content == "Hello!\n" + + async def test_text_not_found(self, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "hello.txt", + "old_text": "nonexistent string", + "new_text": "replacement", + } + ) + assert result.is_error + assert "Text not found" in result.content + assert result.metadata["occurrences_found"] == 0 + + async def test_multiple_occurrences_replaces_first( + self, workspace: Path, edit_tool: EditFileTool + ) -> None: + (workspace / "dups.txt").write_text("aaa bbb aaa", encoding="utf-8") + result = await edit_tool.execute( + arguments={ + "path": "dups.txt", + "old_text": "aaa", + "new_text": "ccc", + } + ) + assert not result.is_error + assert "2 total occurrences" in result.content + content = (workspace / "dups.txt").read_text(encoding="utf-8") + assert content == "ccc bbb aaa" + assert result.metadata["occurrences_found"] == 2 + assert result.metadata["occurrences_replaced"] == 1 + + async def test_identical_old_new_text(self, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "hello.txt", + "old_text": "same", + "new_text": "same", + } + ) + assert not result.is_error + assert "No change needed" in result.content + + async def test_file_not_found(self, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "nope.txt", + "old_text": "a", + "new_text": "b", + } + ) + assert result.is_error + assert "not found" in result.content.lower() + + async def test_path_traversal_blocked(self, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "../../../etc/hosts", + "old_text": "a", + "new_text": "b", + } + ) + assert result.is_error + assert "escapes workspace" in result.content + + async def test_binary_file_errors( + self, workspace: Path, edit_tool: EditFileTool + ) -> None: + (workspace / "bin.dat").write_bytes(b"\x00\x01\x80\xff") + result = await edit_tool.execute( + arguments={ + "path": "bin.dat", + "old_text": "x", + "new_text": "y", + } + ) + assert result.is_error + assert "binary" in result.content.lower() + + async def test_edit_preserves_other_content( + self, workspace: Path, edit_tool: EditFileTool + ) -> None: + (workspace / "multi.txt").write_text("line1\nline2\nline3\n", encoding="utf-8") + result = await edit_tool.execute( + arguments={ + "path": "multi.txt", + "old_text": "line2", + "new_text": "LINE_TWO", + } + ) + assert not result.is_error + content = (workspace / "multi.txt").read_text(encoding="utf-8") + assert content == "line1\nLINE_TWO\nline3\n" diff --git a/tests/unit/tools/file_system/test_list_directory.py b/tests/unit/tools/file_system/test_list_directory.py new file mode 100644 index 0000000000..8d0369243a --- /dev/null +++ b/tests/unit/tools/file_system/test_list_directory.py @@ -0,0 +1,91 @@ +"""Tests for ListDirectoryTool.""" + +from typing import TYPE_CHECKING + +import pytest + +from ai_company.tools.file_system.list_directory import ( + MAX_ENTRIES, + ListDirectoryTool, +) + +if TYPE_CHECKING: + from pathlib import Path + + +@pytest.mark.unit +class TestListDirectoryExecution: + """Execution tests.""" + + async def test_list_workspace_root(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={}) + assert not result.is_error + assert "[FILE]" in result.content + assert "hello.txt" in result.content + assert "[DIR]" in result.content + assert "subdir" in result.content + + async def test_list_explicit_dot(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "."}) + assert not result.is_error + assert "hello.txt" in result.content + + async def test_list_subdirectory(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "subdir"}) + assert not result.is_error + assert "nested.py" in result.content + assert result.metadata["files"] >= 1 + + async def test_glob_pattern(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": ".", "pattern": "*.txt"}) + assert not result.is_error + assert "hello.txt" in result.content + # subdir should not appear when filtering by *.txt + assert "subdir" not in result.content + + async def test_recursive_listing(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": ".", "recursive": True}) + assert not result.is_error + assert "nested.py" in result.content + + async def test_empty_directory( + self, workspace: Path, list_tool: ListDirectoryTool + ) -> None: + (workspace / "empty_dir").mkdir() + result = await list_tool.execute(arguments={"path": "empty_dir"}) + assert not result.is_error + assert "empty" in result.content.lower() + + async def test_not_a_directory(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "hello.txt"}) + assert result.is_error + assert "Not a directory" in result.content + + async def test_path_traversal_blocked(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "../../.."}) + assert result.is_error + assert "escapes workspace" in result.content + + async def test_metadata_counts(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "."}) + assert not result.is_error + assert result.metadata["directories"] >= 1 + assert result.metadata["files"] >= 1 + assert result.metadata["total_entries"] == ( + result.metadata["directories"] + result.metadata["files"] + ) + + async def test_file_shows_size(self, list_tool: ListDirectoryTool) -> None: + result = await list_tool.execute(arguments={"path": "."}) + assert not result.is_error + assert "bytes)" in result.content + + async def test_large_directory_truncation(self, workspace: Path) -> None: + big_dir = workspace / "big" + big_dir.mkdir() + for i in range(MAX_ENTRIES + 10): + (big_dir / f"file_{i:05d}.txt").write_text("x", encoding="utf-8") + tool = ListDirectoryTool(workspace_root=workspace) + result = await tool.execute(arguments={"path": "big"}) + assert not result.is_error + assert "Truncated" in result.content diff --git a/tests/unit/tools/file_system/test_path_validator.py b/tests/unit/tools/file_system/test_path_validator.py new file mode 100644 index 0000000000..056ee040d4 --- /dev/null +++ b/tests/unit/tools/file_system/test_path_validator.py @@ -0,0 +1,83 @@ +"""Tests for PathValidator.""" + +from typing import TYPE_CHECKING + +import pytest + +from ai_company.tools.file_system._path_validator import PathValidator + +if TYPE_CHECKING: + from pathlib import Path + + +@pytest.mark.unit +class TestPathValidatorInit: + """Constructor validation tests.""" + + def test_valid_workspace(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + assert pv.workspace_root == tmp_path.resolve() + + def test_nonexistent_workspace_raises(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="not an existing directory"): + PathValidator(tmp_path / "nope") + + def test_file_as_workspace_raises(self, tmp_path: Path) -> None: + f = tmp_path / "file.txt" + f.write_text("x") + with pytest.raises(ValueError, match="not an existing directory"): + PathValidator(f) + + +@pytest.mark.unit +class TestValidate: + """Path validation tests.""" + + def test_relative_path(self, tmp_path: Path) -> None: + (tmp_path / "a.txt").write_text("x") + pv = PathValidator(tmp_path) + result = pv.validate("a.txt") + assert result == (tmp_path / "a.txt").resolve() + + def test_nested_path(self, tmp_path: Path) -> None: + sub = tmp_path / "d" + sub.mkdir() + (sub / "b.txt").write_text("x") + pv = PathValidator(tmp_path) + result = pv.validate("d/b.txt") + assert result == (sub / "b.txt").resolve() + + def test_traversal_rejected(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + with pytest.raises(ValueError, match="escapes workspace"): + pv.validate("../etc/passwd") + + def test_absolute_outside_rejected(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + with pytest.raises(ValueError, match="escapes workspace"): + pv.validate("/etc/passwd") + + def test_dot_path(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + result = pv.validate(".") + assert result == tmp_path.resolve() + + +@pytest.mark.unit +class TestValidateParentExists: + """Parent-existence validation tests.""" + + def test_existing_parent(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + result = pv.validate_parent_exists("new_file.txt") + assert result.parent == tmp_path.resolve() + + def test_missing_parent_raises(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + with pytest.raises(ValueError, match="Parent directory does not exist"): + pv.validate_parent_exists("no/such/dir/file.txt") + + def test_traversal_still_rejected(self, tmp_path: Path) -> None: + pv = PathValidator(tmp_path) + with pytest.raises(ValueError, match="escapes workspace"): + pv.validate_parent_exists("../../escape.txt") diff --git a/tests/unit/tools/file_system/test_read_file.py b/tests/unit/tools/file_system/test_read_file.py new file mode 100644 index 0000000000..19d53fdf97 --- /dev/null +++ b/tests/unit/tools/file_system/test_read_file.py @@ -0,0 +1,116 @@ +"""Tests for ReadFileTool.""" + +from typing import TYPE_CHECKING + +import pytest + +from ai_company.core.enums import ToolCategory +from ai_company.tools.file_system.read_file import MAX_FILE_SIZE_BYTES, ReadFileTool + +if TYPE_CHECKING: + from pathlib import Path + + +@pytest.mark.unit +class TestReadFileToolProperties: + """Tool metadata tests.""" + + def test_name(self, read_tool: ReadFileTool) -> None: + assert read_tool.name == "read_file" + + def test_category(self, read_tool: ReadFileTool) -> None: + assert read_tool.category == ToolCategory.FILE_SYSTEM + + def test_has_schema(self, read_tool: ReadFileTool) -> None: + schema = read_tool.parameters_schema + assert schema is not None + assert "path" in schema["properties"] + + +@pytest.mark.unit +class TestReadFileExecution: + """Execution tests.""" + + async def test_read_full_file(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "hello.txt"}) + assert not result.is_error + assert "Hello, world!" in result.content + assert result.metadata["path"] == "hello.txt" + assert result.metadata["size_bytes"] > 0 + + async def test_read_empty_file(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "empty.txt"}) + assert not result.is_error + assert result.content == "" + + async def test_read_nested_file(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "subdir/nested.py"}) + assert not result.is_error + assert "print('nested')" in result.content + + async def test_read_with_line_range( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + (workspace / "multi.txt").write_text( + "line1\nline2\nline3\nline4\nline5\n", encoding="utf-8" + ) + result = await read_tool.execute( + arguments={"path": "multi.txt", "start_line": 2, "end_line": 4} + ) + assert not result.is_error + assert "line2" in result.content + assert "line4" in result.content + assert "line1" not in result.content + assert "line5" not in result.content + + async def test_read_start_line_only( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + (workspace / "multi.txt").write_text("a\nb\nc\n", encoding="utf-8") + result = await read_tool.execute( + arguments={"path": "multi.txt", "start_line": 2} + ) + assert not result.is_error + assert "b" in result.content + assert "c" in result.content + + async def test_read_end_line_only( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + (workspace / "multi.txt").write_text("a\nb\nc\n", encoding="utf-8") + result = await read_tool.execute(arguments={"path": "multi.txt", "end_line": 2}) + assert not result.is_error + assert "a" in result.content + assert "b" in result.content + + async def test_file_not_found(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "nonexistent.txt"}) + assert result.is_error + assert "not found" in result.content.lower() + + async def test_path_traversal_blocked(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "../../../etc/passwd"}) + assert result.is_error + assert "escapes workspace" in result.content + + async def test_read_directory_errors(self, read_tool: ReadFileTool) -> None: + result = await read_tool.execute(arguments={"path": "subdir"}) + assert result.is_error + + async def test_binary_file_errors( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + (workspace / "binary.bin").write_bytes(b"\x00\x01\x80\xff") + result = await read_tool.execute(arguments={"path": "binary.bin"}) + assert result.is_error + assert "binary" in result.content.lower() + + async def test_large_file_truncated( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + big_content = "x" * (MAX_FILE_SIZE_BYTES + 1000) + (workspace / "big.txt").write_text(big_content, encoding="utf-8") + result = await read_tool.execute(arguments={"path": "big.txt"}) + assert not result.is_error + assert "Truncated" in result.content + assert len(result.content) < len(big_content) diff --git a/tests/unit/tools/file_system/test_write_file.py b/tests/unit/tools/file_system/test_write_file.py new file mode 100644 index 0000000000..02f3e64df0 --- /dev/null +++ b/tests/unit/tools/file_system/test_write_file.py @@ -0,0 +1,85 @@ +"""Tests for WriteFileTool.""" + +from typing import TYPE_CHECKING + +import pytest + +if TYPE_CHECKING: + from pathlib import Path + + from ai_company.tools.file_system.write_file import WriteFileTool + + +@pytest.mark.unit +class TestWriteFileExecution: + """Execution tests.""" + + async def test_create_new_file( + self, workspace: Path, write_tool: WriteFileTool + ) -> None: + result = await write_tool.execute( + arguments={"path": "new.txt", "content": "brand new"} + ) + assert not result.is_error + assert "Created" in result.content + assert result.metadata["created"] is True + assert (workspace / "new.txt").read_text(encoding="utf-8") == "brand new" + + async def test_overwrite_existing_file( + self, workspace: Path, write_tool: WriteFileTool + ) -> None: + result = await write_tool.execute( + arguments={"path": "hello.txt", "content": "overwritten"} + ) + assert not result.is_error + assert "Updated" in result.content + assert result.metadata["created"] is False + assert (workspace / "hello.txt").read_text(encoding="utf-8") == "overwritten" + + async def test_missing_parent_without_create_dirs( + self, write_tool: WriteFileTool + ) -> None: + result = await write_tool.execute( + arguments={"path": "no/such/dir/file.txt", "content": "x"} + ) + assert result.is_error + assert "Parent directory does not exist" in result.content + + async def test_create_directories( + self, workspace: Path, write_tool: WriteFileTool + ) -> None: + result = await write_tool.execute( + arguments={ + "path": "a/b/c/deep.txt", + "content": "deep", + "create_directories": True, + } + ) + assert not result.is_error + assert result.metadata["created"] is True + assert (workspace / "a" / "b" / "c" / "deep.txt").read_text( + encoding="utf-8" + ) == "deep" + + async def test_path_traversal_blocked(self, write_tool: WriteFileTool) -> None: + result = await write_tool.execute( + arguments={"path": "../../escape.txt", "content": "bad"} + ) + assert result.is_error + assert "escapes workspace" in result.content + + async def test_write_empty_content( + self, workspace: Path, write_tool: WriteFileTool + ) -> None: + result = await write_tool.execute( + arguments={"path": "blank.txt", "content": ""} + ) + assert not result.is_error + assert (workspace / "blank.txt").read_text(encoding="utf-8") == "" + + async def test_bytes_written_metadata(self, write_tool: WriteFileTool) -> None: + result = await write_tool.execute( + arguments={"path": "sized.txt", "content": "hello"} + ) + assert not result.is_error + assert result.metadata["bytes_written"] > 0 From ffffdb617dd6b9afdb57924749c837a57203fc41 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 09:32:23 +0100 Subject: [PATCH 2/3] feat: implement built-in file system tools (#18) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-reviewed by 10 agents, 28 findings addressed - Add PathValidator for workspace-scoped path sandboxing with symlink resolution - Implement ReadFileTool with bounded reads, line-range selection, binary detection - Implement WriteFileTool with parent directory creation, size guard (10 MB) - Implement EditFileTool with search-and-replace, size guard (1 MB), noop detection - Implement DeleteFileTool with TOCTOU-safe deletion, elevated permission flag - Implement ListDirectoryTool with glob filtering, recursive listing, truncation - Add BaseFileSystemTool ABC with shared _map_os_error helper - Add cross-platform is_dir() checks for Windows compatibility - Add unsafe glob pattern rejection to prevent workspace escape - Add 5 new observability event constants for filesystem operations - Update DESIGN_SPEC.md §15.3 project structure and §15.5 tool sandboxing - Update CLAUDE.md tools/ description to include built-in tools - Comprehensive test coverage: 44 tests across 6 test files Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 2 +- DESIGN_SPEC.md | 12 +- src/ai_company/observability/events/tool.py | 5 + .../tools/file_system/_base_fs_tool.py | 23 +++ .../tools/file_system/_path_validator.py | 20 ++- .../tools/file_system/delete_file.py | 35 +++-- src/ai_company/tools/file_system/edit_file.py | 100 +++++++----- .../tools/file_system/list_directory.py | 74 +++++++-- src/ai_company/tools/file_system/read_file.py | 147 +++++++++++------- .../tools/file_system/write_file.py | 71 ++++++--- .../tools/file_system/test_delete_file.py | 38 +++++ .../unit/tools/file_system/test_edit_file.py | 31 ++++ .../tools/file_system/test_list_directory.py | 68 +++++++- .../tools/file_system/test_path_validator.py | 45 ++++++ .../unit/tools/file_system/test_read_file.py | 16 ++ .../unit/tools/file_system/test_write_file.py | 19 +++ 16 files changed, 558 insertions(+), 148 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index bf36e2414b..ebaeb28e25 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -54,7 +54,7 @@ src/ai_company/ providers/ # LLM provider abstraction (LiteLLM adapter) security/ # SecOps agent, approval gates, audit templates/ # Pre-built company templates and builder - tools/ # Tool registry, MCP integration, role-based access, sandboxing + tools/ # Tool registry, MCP integration, role-based access, built-in tools (file_system/) ``` ## Shell Usage diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 7bdfa34e85..9ee39e3196 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -2344,7 +2344,15 @@ ai-company/ │ │ │ ├── protocol.py # SandboxBackend protocol │ │ │ ├── subprocess.py # SubprocessSandbox (default for low-risk) │ │ │ └── docker.py # DockerSandbox (for code_runner, terminal) -│ │ ├── file_system.py # File operations (M3) +│ │ ├── file_system/ # Built-in file system tools +│ │ │ ├── __init__.py # Package exports +│ │ │ ├── _base_fs_tool.py # BaseFileSystemTool ABC +│ │ │ ├── _path_validator.py # Workspace path validation +│ │ │ ├── delete_file.py # DeleteFileTool +│ │ │ ├── edit_file.py # EditFileTool +│ │ │ ├── list_directory.py # ListDirectoryTool +│ │ │ ├── read_file.py # ReadFileTool +│ │ │ └── write_file.py # WriteFileTool │ │ ├── git_tools.py # Git operations (M3) │ │ ├── code_runner.py # Code execution (M3) │ │ ├── web_tools.py # HTTP, search (M3) @@ -2427,7 +2435,7 @@ These conventions were established during the M0–M2+ review cycle. **Adopted** | **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events. import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. | | **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. | | **Tool permission checking** | Adopted (M3) | `ToolPermissionChecker` enforces category-level gating based on `ToolAccessLevel` (sandboxed → restricted → standard → elevated, plus custom). Priority-based resolution: denied list → allowed list → level categories → deny. Case-insensitive name matching. `ToolInvoker` filters definitions for prompt and checks at invocation time. | Defense-in-depth: agents only see permitted tools in the LLM prompt, and invocations are re-checked at execution time. Explicit allow/deny lists provide per-agent overrides. See §11.1.1. | -| **Tool sandboxing** | Planned (M3) | Layered `SandboxBackend` protocol: `SubprocessSandbox` for low-risk tools (file, git), `DockerSandbox` for high-risk tools (code_runner, terminal, web, database). `K8sSandbox` planned for future container deployments. | Risk-proportionate isolation. Docker optional — only needed for code execution and network-sensitive tools. Pluggable protocol enables seamless migration to K8s per-agent pods in Phase 3-4. See §11.1.2. | +| **Tool sandboxing** | Partial (M3) | File system tools use in-process `PathValidator` for workspace-scoped path validation (symlink resolution + containment check). `BaseFileSystemTool` ABC provides shared `ToolCategory.FILE_SYSTEM` and `PathValidator` integration — all file system tools extend this base. `SandboxBackend` protocol with `SubprocessSandbox` / `DockerSandbox` remains planned for git, code_runner, terminal, web, and database tools. `K8sSandbox` planned for future container deployments. | File system tools use defence-in-depth path validation; heavier sandbox isolation reserved for higher-risk tool categories (code execution, network). See §11.1.2. | | **Crash recovery** | Planned (M3) | Pluggable `RecoveryStrategy` protocol. M3: `FailAndReassignStrategy` (catch at engine boundary, log snapshot, mark FAILED, reassign). M4/M5: `CheckpointStrategy` (persist `AgentContext` per turn, resume from last checkpoint). | Immutable `model_copy` pattern makes checkpoint serialization trivial to add later. Fail-and-reassign is sufficient for short MVP tasks. See §6.6. | | **Agent behavior testing** | Planned (M3) | Scripted `FakeProvider` for unit tests (deterministic turn sequences); behavioral outcome assertions for integration tests (task completed, tools called, cost within budget). | Leverages existing `FakeProvider` and `CompletionResponseFactory` fixtures. Precise engine testing without brittle response-matching at integration level. | | **LLM call analytics** | Planned (incremental) | M3: proxy metrics (`turns_per_task`, `tokens_per_task`). M4: call categorization (`productive`, `coordination`, `system`) + orchestration ratio. M5+: full analytics (retry tracking, latency, cache hits, per-provider comparison). | Append-only, never blocks execution. Builds on existing `CostRecord` infrastructure. Detects orchestration overhead early. See §10.5. | diff --git a/src/ai_company/observability/events/tool.py b/src/ai_company/observability/events/tool.py index 69fec52f1f..fc73b99805 100644 --- a/src/ai_company/observability/events/tool.py +++ b/src/ai_company/observability/events/tool.py @@ -34,3 +34,8 @@ TOOL_FS_BINARY_DETECTED: Final[str] = "tool.fs.binary_detected" TOOL_FS_SIZE_EXCEEDED: Final[str] = "tool.fs.size_exceeded" TOOL_FS_ERROR: Final[str] = "tool.fs.error" +TOOL_FS_STAT_FAILED: Final[str] = "tool.fs.stat_failed" +TOOL_FS_WORKSPACE_INVALID: Final[str] = "tool.fs.workspace_invalid" +TOOL_FS_PARENT_NOT_FOUND: Final[str] = "tool.fs.parent_not_found" +TOOL_FS_GLOB_REJECTED: Final[str] = "tool.fs.glob_rejected" +TOOL_FS_NOOP: Final[str] = "tool.fs.noop" diff --git a/src/ai_company/tools/file_system/_base_fs_tool.py b/src/ai_company/tools/file_system/_base_fs_tool.py index 7b3d4fe163..93ea5a752a 100644 --- a/src/ai_company/tools/file_system/_base_fs_tool.py +++ b/src/ai_company/tools/file_system/_base_fs_tool.py @@ -8,12 +8,35 @@ from typing import TYPE_CHECKING, Any from ai_company.core.enums import ToolCategory +from ai_company.observability import get_logger from ai_company.tools.base import BaseTool from ai_company.tools.file_system._path_validator import PathValidator if TYPE_CHECKING: from pathlib import Path +logger = get_logger(__name__) + + +def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]: + """Map an OS error to ``(log_key, user_message)`` for FS operations. + + Args: + exc: The caught OS-level exception. + user_path: The original user-supplied path string. + verb: Action verb for the fallback message (e.g. ``"reading"``). + + Returns: + A two-tuple of (structured log key, human-readable message). + """ + if isinstance(exc, FileNotFoundError): + return "not_found", f"File not found: {user_path}" + if isinstance(exc, IsADirectoryError): + return "is_directory", f"Path is a directory, not a file: {user_path}" + if isinstance(exc, PermissionError): + return "permission_denied", f"Permission denied: {user_path}" + return str(exc), f"OS error {verb} file: {user_path}" + class BaseFileSystemTool(BaseTool, ABC): """Abstract base for all file system tools. diff --git a/src/ai_company/tools/file_system/_path_validator.py b/src/ai_company/tools/file_system/_path_validator.py index 3de7d1228e..f341f01027 100644 --- a/src/ai_company/tools/file_system/_path_validator.py +++ b/src/ai_company/tools/file_system/_path_validator.py @@ -9,7 +9,11 @@ from typing import TYPE_CHECKING from ai_company.observability import get_logger -from ai_company.observability.events.tool import TOOL_FS_PATH_VIOLATION +from ai_company.observability.events.tool import ( + TOOL_FS_PARENT_NOT_FOUND, + TOOL_FS_PATH_VIOLATION, + TOOL_FS_WORKSPACE_INVALID, +) if TYPE_CHECKING: from pathlib import Path @@ -40,6 +44,10 @@ def __init__(self, workspace_root: Path) -> None: """ resolved = workspace_root.resolve() if not resolved.is_dir(): + logger.warning( + TOOL_FS_WORKSPACE_INVALID, + workspace_root=str(workspace_root), + ) msg = f"Workspace root is not an existing directory: {workspace_root}" raise ValueError(msg) self._workspace_root = resolved @@ -61,6 +69,12 @@ def validate(self, path: str) -> Path: Raises: ValueError: If the resolved path escapes the workspace. """ + # NOTE: There is an inherent TOCTOU gap between this validation + # and the actual file operation (which runs in asyncio.to_thread). + # A concurrent process could swap in a symlink between validation + # and use. Full mitigation requires OS-level sandboxing (e.g. + # openat2 RESOLVE_BENEATH on Linux). User-space path validation + # is a best-effort defence-in-depth layer. resolved = (self._workspace_root / path).resolve() if not resolved.is_relative_to(self._workspace_root): logger.warning( @@ -86,6 +100,10 @@ def validate_parent_exists(self, path: str) -> Path: """ resolved = self.validate(path) if not resolved.parent.exists(): + logger.warning( + TOOL_FS_PARENT_NOT_FOUND, + path=path, + ) msg = f"Parent directory does not exist: {path}" raise ValueError(msg) return resolved diff --git a/src/ai_company/tools/file_system/delete_file.py b/src/ai_company/tools/file_system/delete_file.py index 231c91ae57..699b8b182a 100644 --- a/src/ai_company/tools/file_system/delete_file.py +++ b/src/ai_company/tools/file_system/delete_file.py @@ -15,7 +15,17 @@ def _delete_sync(resolved: Path) -> int: - """Delete file synchronously, returning its size before deletion.""" + """Delete file synchronously, returning its size before deletion. + + Raises: + FileNotFoundError: If the file does not exist. + IsADirectoryError: If the path is a directory. + PermissionError: If the process lacks delete permission. + OSError: For other OS-level errors. + """ + if resolved.is_dir(): + msg = f"Is a directory: '{resolved}'" + raise IsADirectoryError(msg) size = resolved.stat().st_size resolved.unlink() return size @@ -25,8 +35,8 @@ class DeleteFileTool(BaseFileSystemTool): """Deletes a single file within the workspace. Directories cannot be deleted with this tool — only regular files. - This tool has ``require_elevated = True``, signalling to the engine - that elevated permissions are needed. + The ``require_elevated`` property is defined for future use by the + engine's permission system (not yet enforced). Examples: Delete a file:: @@ -60,7 +70,12 @@ def __init__(self, *, workspace_root: Path) -> None: @property def require_elevated(self) -> bool: - """Whether this tool requires elevated permissions.""" + """Whether this tool requires elevated permissions. + + Indicates this tool requires explicit approval before execution + due to its destructive nature. Not yet consumed by the engine; + defined for forward-compatibility. + """ return True async def execute( @@ -83,22 +98,22 @@ async def execute( except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) - if not resolved.exists(): + # Let _delete_sync handle all filesystem checks atomically + # rather than pre-checking exists()/is_dir() — avoids TOCTOU. + try: + size_bytes = await asyncio.to_thread(_delete_sync, resolved) + except FileNotFoundError: logger.warning(TOOL_FS_ERROR, path=user_path, error="not_found") return ToolExecutionResult( content=f"File not found: {user_path}", is_error=True, ) - - if resolved.is_dir(): + except IsADirectoryError: logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") return ToolExecutionResult( content=f"Cannot delete directory (use a dedicated tool): {user_path}", is_error=True, ) - - try: - size_bytes = await asyncio.to_thread(_delete_sync, resolved) except PermissionError: logger.warning(TOOL_FS_ERROR, path=user_path, error="permission_denied") return ToolExecutionResult( diff --git a/src/ai_company/tools/file_system/edit_file.py b/src/ai_company/tools/file_system/edit_file.py index 8ccf5b7d5b..becdcaacd5 100644 --- a/src/ai_company/tools/file_system/edit_file.py +++ b/src/ai_company/tools/file_system/edit_file.py @@ -1,44 +1,53 @@ """Edit file tool — search-and-replace within workspace files.""" import asyncio -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger from ai_company.observability.events.tool import ( TOOL_FS_EDIT, TOOL_FS_EDIT_NOT_FOUND, TOOL_FS_ERROR, + TOOL_FS_NOOP, + TOOL_FS_SIZE_EXCEEDED, ) from ai_company.tools.base import ToolExecutionResult -from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool +from ai_company.tools.file_system._base_fs_tool import ( + BaseFileSystemTool, + _map_os_error, +) if TYPE_CHECKING: from pathlib import Path logger = get_logger(__name__) +MAX_EDIT_FILE_SIZE_BYTES: Final[int] = 1_048_576 # 1 MB + -def _edit_sync(resolved: Path, old_text: str, new_text: str) -> tuple[str, int]: +def _edit_sync(resolved: Path, old_text: str, new_text: str) -> int: """Perform search-and-replace synchronously. Returns: - Tuple of (new_content, occurrences_found). + Number of occurrences of *old_text* found in the file. """ content = resolved.read_text(encoding="utf-8") count = content.count(old_text) if count > 0: new_content = content.replace(old_text, new_text, 1) resolved.write_text(new_content, encoding="utf-8") - return content, count + return count class EditFileTool(BaseFileSystemTool): """Replaces the first occurrence of ``old_text`` with ``new_text``. - If ``old_text`` is not found, returns an error with a snippet of the - file content to help the LLM adjust its search string. When - multiple occurrences exist, only the first is replaced and a warning - is included in the output. + If ``old_text`` is not found, returns an error indicating that the + text was not found. When multiple occurrences exist, only the first + is replaced and a warning is included in the output. + + Returns immediately with no change if ``old_text`` and ``new_text`` + are identical. Examples: Replace text:: @@ -79,7 +88,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, "new_text": { "type": "string", - "description": ("Replacement text (empty string to delete)"), + "description": "Replacement text (empty string to delete)", }, }, "required": ["path", "old_text", "new_text"], @@ -87,7 +96,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, ) - async def execute( + async def execute( # noqa: PLR0911 self, *, arguments: dict[str, Any], @@ -106,6 +115,11 @@ async def execute( new_text: str = arguments["new_text"] if old_text == new_text: + logger.debug( + TOOL_FS_NOOP, + path=user_path, + reason="old_text_equals_new_text", + ) return ToolExecutionResult( content=f"No change needed in {user_path}: " "old_text and new_text are identical", @@ -121,9 +135,45 @@ async def execute( except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) + # Explicit is_dir() check: on Windows, stat() succeeds on dirs + # and read_text() raises PermissionError (not IsADirectoryError). + if resolved.is_dir(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") + return ToolExecutionResult( + content=f"Path is a directory, not a file: {user_path}", + is_error=True, + ) + + # Guard against loading excessively large files. + try: + stat_result = await asyncio.to_thread(resolved.stat) + except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: + log_key, msg = _map_os_error(exc, user_path, "editing") + logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) + return ToolExecutionResult(content=msg, is_error=True) + + if stat_result.st_size > MAX_EDIT_FILE_SIZE_BYTES: + logger.warning( + TOOL_FS_SIZE_EXCEEDED, + path=user_path, + size_bytes=stat_result.st_size, + max_bytes=MAX_EDIT_FILE_SIZE_BYTES, + ) + return ToolExecutionResult( + content=( + f"File too large to edit: {user_path} " + f"({stat_result.st_size:,} bytes, " + f"max {MAX_EDIT_FILE_SIZE_BYTES:,})" + ), + is_error=True, + ) + try: - content, count = await asyncio.to_thread( - _edit_sync, resolved, old_text, new_text + count = await asyncio.to_thread( + _edit_sync, + resolved, + old_text, + new_text, ) except UnicodeDecodeError: logger.warning(TOOL_FS_ERROR, path=user_path, error="binary") @@ -132,38 +182,18 @@ async def execute( is_error=True, ) except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: - _err_msgs: dict[type, tuple[str, str]] = { - FileNotFoundError: ( - "not_found", - f"File not found: {user_path}", - ), - IsADirectoryError: ( - "is_directory", - f"Path is a directory, not a file: {user_path}", - ), - PermissionError: ( - "permission_denied", - f"Permission denied: {user_path}", - ), - } - log_key, msg = _err_msgs.get( - type(exc), - (str(exc), f"OS error editing file: {user_path}"), - ) + log_key, msg = _map_os_error(exc, user_path, "editing") logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) return ToolExecutionResult(content=msg, is_error=True) if count == 0: - snippet = content[:500] logger.info( TOOL_FS_EDIT_NOT_FOUND, path=user_path, old_text_preview=old_text[:100], ) return ToolExecutionResult( - content=( - f"Text not found in {user_path}. File content preview:\n{snippet}" - ), + content=f"Text not found in {user_path}.", is_error=True, metadata={ "path": user_path, diff --git a/src/ai_company/tools/file_system/list_directory.py b/src/ai_company/tools/file_system/list_directory.py index 01e1ea0590..7d794fb712 100644 --- a/src/ai_company/tools/file_system/list_directory.py +++ b/src/ai_company/tools/file_system/list_directory.py @@ -1,10 +1,17 @@ """List directory tool — lists entries in a workspace directory.""" import asyncio +import itertools +import re from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger -from ai_company.observability.events.tool import TOOL_FS_ERROR, TOOL_FS_LIST +from ai_company.observability.events.tool import ( + TOOL_FS_ERROR, + TOOL_FS_GLOB_REJECTED, + TOOL_FS_LIST, + TOOL_FS_STAT_FAILED, +) from ai_company.tools.base import ToolExecutionResult from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool @@ -15,6 +22,9 @@ MAX_ENTRIES: Final[int] = 1000 +# Reject glob patterns that could traverse above the workspace. +_UNSAFE_GLOB_RE = re.compile(r"(^|[/\\])\.\.[/\\]|^\.\.$") + def _list_sync( resolved: Path, @@ -24,32 +34,49 @@ def _list_sync( recursive: bool, ) -> list[str]: """Collect directory entries synchronously.""" + glob_pattern = pattern or "*" if recursive: - glob_pattern = pattern or "*" - entries = sorted(resolved.rglob(glob_pattern)) + raw_iter = resolved.rglob(glob_pattern) elif pattern: - entries = sorted(resolved.glob(pattern)) + raw_iter = resolved.glob(glob_pattern) else: - entries = sorted(resolved.iterdir()) + raw_iter = resolved.iterdir() + + # Cap at MAX_ENTRIES + 1 to detect truncation without + # materialising the entire iterator. + entries = sorted(itertools.islice(raw_iter, MAX_ENTRIES + 1)) lines: list[str] = [] for entry in entries: try: + display = str(entry.relative_to(resolved)) if recursive else entry.name + if entry.is_symlink(): target = entry.resolve() if not target.is_relative_to(workspace_root): - lines.append(f"[SYMLINK] {entry.name} -> (outside workspace)") + lines.append(f"[SYMLINK] {display} -> (outside workspace)") continue if entry.is_dir(): - lines.append(f"[DIR] {entry.name}/") + lines.append(f"[DIR] {display}/") else: try: size = entry.stat().st_size - except OSError: - size = 0 - lines.append(f"[FILE] {entry.name} ({size} bytes)") - except OSError: + except OSError as stat_exc: + logger.warning( + TOOL_FS_STAT_FAILED, + path=str(entry), + error=str(stat_exc), + ) + lines.append(f"[FILE] {display} (unknown bytes)") + continue + lines.append(f"[FILE] {display} ({size} bytes)") + except OSError as exc: + logger.warning( + TOOL_FS_ERROR, + path=str(entry), + error=str(exc), + ) lines.append(f"[ERROR] {entry.name}") return lines @@ -60,7 +87,8 @@ class ListDirectoryTool(BaseFileSystemTool): Supports optional glob filtering and recursive listing. Output is sorted alphabetically with type prefixes (``[DIR]`` / ``[FILE]``). - Results are capped at 1000 entries to prevent excessive output. + Results are capped at ``MAX_ENTRIES`` (1000) entries to prevent + excessive output. Examples: List current directory:: @@ -124,12 +152,24 @@ async def execute( pattern: str | None = arguments.get("pattern") recursive: bool = arguments.get("recursive", False) + # Reject glob patterns that could traverse above the workspace. + if pattern and _UNSAFE_GLOB_RE.search(pattern): + logger.warning( + TOOL_FS_GLOB_REJECTED, + pattern=pattern, + ) + return ToolExecutionResult( + content=f"Unsafe glob pattern rejected: {pattern}", + is_error=True, + ) + try: resolved = self.path_validator.validate(user_path) except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) if not resolved.is_dir(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="not_a_directory") return ToolExecutionResult( content=f"Not a directory: {user_path}", is_error=True, @@ -163,12 +203,14 @@ async def execute( dir_count = sum(1 for ln in lines if ln.startswith("[DIR]")) file_count = sum(1 for ln in lines if ln.startswith("[FILE]")) - output = "\n".join(lines) - if truncated: - output += f"\n\n[Truncated: showing first {MAX_ENTRIES} of more entries]" - if not lines: output = f"Directory is empty: {user_path}" + else: + output = "\n".join(lines) + if truncated: + output += ( + f"\n\n[Truncated: showing first {MAX_ENTRIES} of more entries]" + ) logger.info( TOOL_FS_LIST, diff --git a/src/ai_company/tools/file_system/read_file.py b/src/ai_company/tools/file_system/read_file.py index 18fe530c9c..2f810cd23b 100644 --- a/src/ai_company/tools/file_system/read_file.py +++ b/src/ai_company/tools/file_system/read_file.py @@ -1,7 +1,7 @@ """Read file tool — reads file content from the workspace. Supports optional line-range selection and enforces a maximum -file-size guard to prevent loading excessively large files. +file-size guard to prevent loading excessively large files into memory. """ import asyncio @@ -15,7 +15,10 @@ TOOL_FS_SIZE_EXCEEDED, ) from ai_company.tools.base import ToolExecutionResult -from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool +from ai_company.tools.file_system._base_fs_tool import ( + BaseFileSystemTool, + _map_os_error, +) if TYPE_CHECKING: from pathlib import Path @@ -25,23 +28,40 @@ MAX_FILE_SIZE_BYTES: Final[int] = 1_048_576 # 1 MB -def _read_sync(resolved: Path, start: int | None, end: int | None) -> str: - """Read file content synchronously, with optional line slicing.""" - raw = resolved.read_text(encoding="utf-8") +def _read_sync( + resolved: Path, + start: int | None, + end: int | None, + *, + max_bytes: int | None = None, +) -> str: + """Read file content synchronously, with optional line slicing. + + When *max_bytes* is set and no line range is requested, only the + first *max_bytes* bytes are read from disk (avoiding full-file + materialisation for oversized files). + """ if start is not None or end is not None: + raw = resolved.read_text(encoding="utf-8") lines = raw.splitlines(keepends=True) s = (start - 1) if start is not None else 0 e = end if end is not None else len(lines) - raw = "".join(lines[s:e]) - return raw + return "".join(lines[s:e]) + + if max_bytes is not None: + with resolved.open(encoding="utf-8") as fh: + return fh.read(max_bytes) + + return resolved.read_text(encoding="utf-8") class ReadFileTool(BaseFileSystemTool): """Reads the content of a file within the workspace. Supports optional ``start_line`` / ``end_line`` for partial reads. - Files exceeding 1 MB are truncated with a warning. Binary files - (non-UTF-8) produce an error result. + Files exceeding 1 MB are read in bounded fashion: when no line + range is specified only the first 1 MB is returned (with a + truncation notice). Binary (non-UTF-8) files produce an error. Examples: Read an entire file:: @@ -109,47 +129,45 @@ async def execute( except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) - try: - size = await asyncio.to_thread(resolved.stat) - size_bytes = size.st_size - - if size_bytes > MAX_FILE_SIZE_BYTES: - logger.warning( - TOOL_FS_SIZE_EXCEEDED, - path=user_path, - size_bytes=size_bytes, - max_bytes=MAX_FILE_SIZE_BYTES, - ) - - content = await asyncio.to_thread( - _read_sync, resolved, start_line, end_line + # Explicit is_dir() check: on Windows, stat() succeeds on dirs + # and read_text() raises PermissionError (not IsADirectoryError). + if resolved.is_dir(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") + return ToolExecutionResult( + content=f"Path is a directory, not a file: {user_path}", + is_error=True, ) - if size_bytes > MAX_FILE_SIZE_BYTES and not (start_line or end_line): - content = content[:MAX_FILE_SIZE_BYTES] - content += ( - f"\n\n[Truncated: file is {size_bytes:,} bytes, " - f"showing first {MAX_FILE_SIZE_BYTES:,}]" - ) + try: + stat_result = await asyncio.to_thread(resolved.stat) + size_bytes = stat_result.st_size + except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: + log_key, msg = _map_os_error(exc, user_path, "reading") + logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) + return ToolExecutionResult(content=msg, is_error=True) - line_count = content.count("\n") + ( - 1 if content and not content.endswith("\n") else 0 - ) + oversized = size_bytes > MAX_FILE_SIZE_BYTES + has_line_range = start_line is not None or end_line is not None - logger.info( - TOOL_FS_READ, + if oversized: + logger.warning( + TOOL_FS_SIZE_EXCEEDED, path=user_path, size_bytes=size_bytes, - line_count=line_count, + max_bytes=MAX_FILE_SIZE_BYTES, ) - return ToolExecutionResult( - content=content, - metadata={ - "path": user_path, - "size_bytes": size_bytes, - "line_count": line_count, - }, + # When oversized and no line range, read only MAX_FILE_SIZE_BYTES + # to avoid loading the whole file into memory. + max_bytes = MAX_FILE_SIZE_BYTES if oversized and not has_line_range else None + + try: + content = await asyncio.to_thread( + _read_sync, + resolved, + start_line, + end_line, + max_bytes=max_bytes, ) except UnicodeDecodeError: logger.warning(TOOL_FS_BINARY_DETECTED, path=user_path) @@ -158,23 +176,32 @@ async def execute( is_error=True, ) except (FileNotFoundError, IsADirectoryError, PermissionError, OSError) as exc: - _err_msgs: dict[type, tuple[str, str]] = { - FileNotFoundError: ( - "not_found", - f"File not found: {user_path}", - ), - IsADirectoryError: ( - "is_directory", - f"Path is a directory, not a file: {user_path}", - ), - PermissionError: ( - "permission_denied", - f"Permission denied: {user_path}", - ), - } - log_key, msg = _err_msgs.get( - type(exc), - (str(exc), f"OS error reading file: {user_path}"), - ) + log_key, msg = _map_os_error(exc, user_path, "reading") logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) return ToolExecutionResult(content=msg, is_error=True) + + if oversized and not has_line_range: + content += ( + f"\n\n[Truncated: file is {size_bytes:,} bytes, " + f"showing first {MAX_FILE_SIZE_BYTES:,}]" + ) + + line_count = content.count("\n") + ( + 1 if content and not content.endswith("\n") else 0 + ) + + logger.info( + TOOL_FS_READ, + path=user_path, + size_bytes=size_bytes, + line_count=line_count, + ) + + return ToolExecutionResult( + content=content, + metadata={ + "path": user_path, + "size_bytes": size_bytes, + "line_count": line_count, + }, + ) diff --git a/src/ai_company/tools/file_system/write_file.py b/src/ai_company/tools/file_system/write_file.py index 76d372a20a..3a1255848d 100644 --- a/src/ai_company/tools/file_system/write_file.py +++ b/src/ai_company/tools/file_system/write_file.py @@ -1,10 +1,14 @@ """Write file tool — creates or overwrites files in the workspace.""" import asyncio -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger -from ai_company.observability.events.tool import TOOL_FS_ERROR, TOOL_FS_WRITE +from ai_company.observability.events.tool import ( + TOOL_FS_ERROR, + TOOL_FS_SIZE_EXCEEDED, + TOOL_FS_WRITE, +) from ai_company.tools.base import ToolExecutionResult from ai_company.tools.file_system._base_fs_tool import BaseFileSystemTool @@ -13,6 +17,8 @@ logger = get_logger(__name__) +MAX_WRITE_SIZE_BYTES: Final[int] = 10_485_760 # 10 MB + def _write_sync(resolved: Path, content: str, *, create_dirs: bool) -> tuple[int, bool]: """Write content to file synchronously. @@ -80,7 +86,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, ) - async def execute( + async def execute( # noqa: PLR0911 self, *, arguments: dict[str, Any], @@ -98,6 +104,22 @@ async def execute( content: str = arguments["content"] create_dirs: bool = arguments.get("create_directories", False) + content_size = len(content.encode("utf-8")) + if content_size > MAX_WRITE_SIZE_BYTES: + logger.warning( + TOOL_FS_SIZE_EXCEEDED, + path=user_path, + size_bytes=content_size, + max_bytes=MAX_WRITE_SIZE_BYTES, + ) + return ToolExecutionResult( + content=( + f"Content too large to write: {content_size:,} bytes " + f"(max {MAX_WRITE_SIZE_BYTES:,})" + ), + is_error=True, + ) + try: if create_dirs: resolved = self.path_validator.validate(user_path) @@ -106,27 +128,19 @@ async def execute( except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) + # Explicit is_dir() check: on Windows, write_text() on a directory + # raises PermissionError (not IsADirectoryError). + if resolved.is_dir(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") + return ToolExecutionResult( + content=f"Path is a directory, not a file: {user_path}", + is_error=True, + ) + try: bytes_written, created = await asyncio.to_thread( _write_sync, resolved, content, create_dirs=create_dirs ) - - action = "Created" if created else "Updated" - logger.info( - TOOL_FS_WRITE, - path=user_path, - bytes_written=bytes_written, - created=created, - ) - - return ToolExecutionResult( - content=f"{action} {user_path} ({bytes_written} bytes)", - metadata={ - "path": user_path, - "bytes_written": bytes_written, - "created": created, - }, - ) except IsADirectoryError: logger.warning(TOOL_FS_ERROR, path=user_path, error="is_directory") return ToolExecutionResult( @@ -145,3 +159,20 @@ async def execute( content=f"OS error writing file: {user_path}", is_error=True, ) + + action = "Created" if created else "Updated" + logger.info( + TOOL_FS_WRITE, + path=user_path, + bytes_written=bytes_written, + created=created, + ) + + return ToolExecutionResult( + content=f"{action} {user_path} ({bytes_written} bytes)", + metadata={ + "path": user_path, + "bytes_written": bytes_written, + "created": created, + }, + ) diff --git a/tests/unit/tools/file_system/test_delete_file.py b/tests/unit/tools/file_system/test_delete_file.py index 7756b26caa..5ff6c5cf89 100644 --- a/tests/unit/tools/file_system/test_delete_file.py +++ b/tests/unit/tools/file_system/test_delete_file.py @@ -63,3 +63,41 @@ async def test_delete_empty_file( assert not result.is_error assert not (workspace / "empty.txt").exists() assert result.metadata["size_bytes"] == 0 + + async def test_delete_permission_error( + self, + workspace: Path, + delete_tool: DeleteFileTool, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """PermissionError during deletion is handled gracefully.""" + from ai_company.tools.file_system import delete_file as mod + + def _fake_delete_sync(resolved: Path) -> int: + raise PermissionError + + monkeypatch.setattr(mod, "_delete_sync", _fake_delete_sync) + result = await delete_tool.execute( + arguments={"path": "hello.txt"}, + ) + assert result.is_error + assert "Permission denied" in result.content + + async def test_delete_os_error( + self, + workspace: Path, + delete_tool: DeleteFileTool, + monkeypatch: pytest.MonkeyPatch, + ) -> None: + """Generic OSError during deletion is handled gracefully.""" + from ai_company.tools.file_system import delete_file as mod + + def _fake_delete_sync(resolved: Path) -> int: + raise OSError + + monkeypatch.setattr(mod, "_delete_sync", _fake_delete_sync) + result = await delete_tool.execute( + arguments={"path": "hello.txt"}, + ) + assert result.is_error + assert "OS error" in result.content diff --git a/tests/unit/tools/file_system/test_edit_file.py b/tests/unit/tools/file_system/test_edit_file.py index 623ee8a0ed..79523ff1f6 100644 --- a/tests/unit/tools/file_system/test_edit_file.py +++ b/tests/unit/tools/file_system/test_edit_file.py @@ -53,6 +53,8 @@ async def test_text_not_found(self, edit_tool: EditFileTool) -> None: assert result.is_error assert "Text not found" in result.content assert result.metadata["occurrences_found"] == 0 + # Verify no file content snippet is leaked (#16) + assert "Hello" not in result.content async def test_multiple_occurrences_replaces_first( self, workspace: Path, edit_tool: EditFileTool @@ -133,3 +135,32 @@ async def test_edit_preserves_other_content( assert not result.is_error content = (workspace / "multi.txt").read_text(encoding="utf-8") assert content == "line1\nLINE_TWO\nline3\n" + + async def test_edit_directory_errors(self, edit_tool: EditFileTool) -> None: + result = await edit_tool.execute( + arguments={ + "path": "subdir", + "old_text": "a", + "new_text": "b", + } + ) + assert result.is_error + assert "directory" in result.content.lower() + + async def test_edit_large_file_rejected( + self, workspace: Path, edit_tool: EditFileTool + ) -> None: + """Files exceeding the size guard are rejected.""" + from ai_company.tools.file_system.edit_file import MAX_EDIT_FILE_SIZE_BYTES + + big = "x" * (MAX_EDIT_FILE_SIZE_BYTES + 100) + (workspace / "huge.txt").write_text(big, encoding="utf-8") + result = await edit_tool.execute( + arguments={ + "path": "huge.txt", + "old_text": "x", + "new_text": "y", + } + ) + assert result.is_error + assert "too large" in result.content.lower() diff --git a/tests/unit/tools/file_system/test_list_directory.py b/tests/unit/tools/file_system/test_list_directory.py index 8d0369243a..69ff38a58b 100644 --- a/tests/unit/tools/file_system/test_list_directory.py +++ b/tests/unit/tools/file_system/test_list_directory.py @@ -1,5 +1,7 @@ """Tests for ListDirectoryTool.""" +import os +import sys from typing import TYPE_CHECKING import pytest @@ -12,6 +14,8 @@ if TYPE_CHECKING: from pathlib import Path +_SYMLINK_SUPPORTED = not (sys.platform == "win32" and not os.environ.get("CI")) + @pytest.mark.unit class TestListDirectoryExecution: @@ -48,6 +52,28 @@ async def test_recursive_listing(self, list_tool: ListDirectoryTool) -> None: assert not result.is_error assert "nested.py" in result.content + async def test_recursive_shows_relative_paths( + self, list_tool: ListDirectoryTool + ) -> None: + """Recursive listing must include directory context in display.""" + result = await list_tool.execute(arguments={"path": ".", "recursive": True}) + assert not result.is_error + # The nested file should show its subdirectory path + assert "subdir" in result.content + assert "nested.py" in result.content + + async def test_recursive_with_pattern( + self, workspace: Path, list_tool: ListDirectoryTool + ) -> None: + """Recursive listing with glob pattern.""" + result = await list_tool.execute( + arguments={"path": ".", "pattern": "*.py", "recursive": True} + ) + assert not result.is_error + assert "nested.py" in result.content + # .txt files should be filtered out + assert "hello.txt" not in result.content + async def test_empty_directory( self, workspace: Path, list_tool: ListDirectoryTool ) -> None: @@ -71,9 +97,6 @@ async def test_metadata_counts(self, list_tool: ListDirectoryTool) -> None: assert not result.is_error assert result.metadata["directories"] >= 1 assert result.metadata["files"] >= 1 - assert result.metadata["total_entries"] == ( - result.metadata["directories"] + result.metadata["files"] - ) async def test_file_shows_size(self, list_tool: ListDirectoryTool) -> None: result = await list_tool.execute(arguments={"path": "."}) @@ -89,3 +112,42 @@ async def test_large_directory_truncation(self, workspace: Path) -> None: result = await tool.execute(arguments={"path": "big"}) assert not result.is_error assert "Truncated" in result.content + + async def test_unsafe_glob_pattern_rejected( + self, list_tool: ListDirectoryTool + ) -> None: + """Glob patterns with .. must be rejected.""" + result = await list_tool.execute(arguments={"path": ".", "pattern": "../../*"}) + assert result.is_error + assert "Unsafe glob pattern" in result.content + + async def test_unsafe_glob_mid_path_rejected( + self, list_tool: ListDirectoryTool + ) -> None: + result = await list_tool.execute(arguments={"path": ".", "pattern": "foo/../*"}) + assert result.is_error + assert "Unsafe glob pattern" in result.content + + +@pytest.mark.unit +class TestListDirectorySymlinks: + """Symlink handling tests.""" + + @pytest.mark.skipif( + not _SYMLINK_SUPPORTED, + reason="Symlinks require privileges on Windows outside CI", + ) + async def test_symlink_outside_workspace_annotated(self, workspace: Path) -> None: + """Symlinks pointing outside workspace show annotation.""" + outside = workspace.parent / "outside_target" + outside.mkdir(exist_ok=True) + (outside / "secret.txt").write_text("secret", encoding="utf-8") + + link = workspace / "escape_link" + link.symlink_to(outside / "secret.txt") + + tool = ListDirectoryTool(workspace_root=workspace) + result = await tool.execute(arguments={"path": "."}) + assert not result.is_error + assert "[SYMLINK]" in result.content + assert "outside workspace" in result.content diff --git a/tests/unit/tools/file_system/test_path_validator.py b/tests/unit/tools/file_system/test_path_validator.py index 056ee040d4..430117b357 100644 --- a/tests/unit/tools/file_system/test_path_validator.py +++ b/tests/unit/tools/file_system/test_path_validator.py @@ -1,5 +1,7 @@ """Tests for PathValidator.""" +import os +import sys from typing import TYPE_CHECKING import pytest @@ -9,6 +11,8 @@ if TYPE_CHECKING: from pathlib import Path +_SYMLINK_SUPPORTED = not (sys.platform == "win32" and not os.environ.get("CI")) + @pytest.mark.unit class TestPathValidatorInit: @@ -81,3 +85,44 @@ def test_traversal_still_rejected(self, tmp_path: Path) -> None: pv = PathValidator(tmp_path) with pytest.raises(ValueError, match="escapes workspace"): pv.validate_parent_exists("../../escape.txt") + + +@pytest.mark.unit +class TestSymlinkHandling: + """Symlink-specific validation tests.""" + + @pytest.mark.skipif( + not _SYMLINK_SUPPORTED, + reason="Symlinks require privileges on Windows outside CI", + ) + def test_symlink_outside_workspace_rejected(self, tmp_path: Path) -> None: + """A symlink pointing outside the workspace must be rejected.""" + outside = tmp_path / "outside" + outside.mkdir() + (outside / "secret.txt").write_text("secret", encoding="utf-8") + + workspace = tmp_path / "workspace" + workspace.mkdir() + link = workspace / "escape_link" + link.symlink_to(outside / "secret.txt") + + pv = PathValidator(workspace) + with pytest.raises(ValueError, match="escapes workspace"): + pv.validate("escape_link") + + @pytest.mark.skipif( + not _SYMLINK_SUPPORTED, + reason="Symlinks require privileges on Windows outside CI", + ) + def test_symlink_inside_workspace_allowed(self, tmp_path: Path) -> None: + """A symlink pointing within the workspace should be allowed.""" + workspace = tmp_path / "workspace" + workspace.mkdir() + real_file = workspace / "real.txt" + real_file.write_text("ok", encoding="utf-8") + link = workspace / "link.txt" + link.symlink_to(real_file) + + pv = PathValidator(workspace) + result = pv.validate("link.txt") + assert result == real_file.resolve() diff --git a/tests/unit/tools/file_system/test_read_file.py b/tests/unit/tools/file_system/test_read_file.py index 19d53fdf97..454a75ef94 100644 --- a/tests/unit/tools/file_system/test_read_file.py +++ b/tests/unit/tools/file_system/test_read_file.py @@ -82,6 +82,7 @@ async def test_read_end_line_only( assert not result.is_error assert "a" in result.content assert "b" in result.content + assert "c" not in result.content async def test_file_not_found(self, read_tool: ReadFileTool) -> None: result = await read_tool.execute(arguments={"path": "nonexistent.txt"}) @@ -96,6 +97,7 @@ async def test_path_traversal_blocked(self, read_tool: ReadFileTool) -> None: async def test_read_directory_errors(self, read_tool: ReadFileTool) -> None: result = await read_tool.execute(arguments={"path": "subdir"}) assert result.is_error + assert "directory" in result.content.lower() async def test_binary_file_errors( self, workspace: Path, read_tool: ReadFileTool @@ -114,3 +116,17 @@ async def test_large_file_truncated( assert not result.is_error assert "Truncated" in result.content assert len(result.content) < len(big_content) + + async def test_large_file_not_truncated_with_line_range( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + """When start_line/end_line is given, truncation is bypassed.""" + lines = ["line\n"] * (MAX_FILE_SIZE_BYTES // 4) + big_content = "".join(lines) + (workspace / "big_lines.txt").write_text(big_content, encoding="utf-8") + result = await read_tool.execute( + arguments={"path": "big_lines.txt", "start_line": 1, "end_line": 5} + ) + assert not result.is_error + assert "Truncated" not in result.content + assert "line" in result.content diff --git a/tests/unit/tools/file_system/test_write_file.py b/tests/unit/tools/file_system/test_write_file.py index 02f3e64df0..0b54c73d15 100644 --- a/tests/unit/tools/file_system/test_write_file.py +++ b/tests/unit/tools/file_system/test_write_file.py @@ -83,3 +83,22 @@ async def test_bytes_written_metadata(self, write_tool: WriteFileTool) -> None: ) assert not result.is_error assert result.metadata["bytes_written"] > 0 + + async def test_write_to_directory_errors(self, write_tool: WriteFileTool) -> None: + """Writing to a path that is a directory returns an error.""" + result = await write_tool.execute(arguments={"path": "subdir", "content": "x"}) + assert result.is_error + assert "directory" in result.content.lower() + + async def test_write_too_large_content_rejected( + self, write_tool: WriteFileTool + ) -> None: + """Content exceeding MAX_WRITE_SIZE_BYTES is rejected.""" + from ai_company.tools.file_system.write_file import MAX_WRITE_SIZE_BYTES + + big = "x" * (MAX_WRITE_SIZE_BYTES + 100) + result = await write_tool.execute( + arguments={"path": "huge.txt", "content": big} + ) + assert result.is_error + assert "too large" in result.content.lower() From be7a6d075fe60ee6ff28d8135f63cbf8e1e9b36c Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 7 Mar 2026 09:58:24 +0100 Subject: [PATCH 3/3] fix: address 22 PR review findings from local agents and external reviewers Security fixes: - Block absolute paths in PathValidator before resolve (path traversal) - Wrap resolve() in try/except OSError for malformed paths - Check containment for ALL directory entries, not just symlinks - Reject absolute glob patterns in list_directory - Block oversized files from line-range reads (DoS prevention) - Add minLength:1 + runtime check for empty old_text in edit (destructive no-op) Reliability fixes: - Atomic writes via tempfile+fsync+replace in edit_file and write_file - Stable "os_error" log key + detailed user message in _map_os_error - Distinguish missing path from non-directory in list_directory - Validate start_line <= end_line in read_file - Compute line_count before appending truncation notice Information leak fix: - Log old_text length instead of preview content in edit_file Documentation: - Add docstrings to _read_sync, _edit_sync, _write_sync, _list_sync - Update DESIGN_SPEC.md file system tool category to include "list" - Update list_directory class docstring with all type prefixes Tests: - Add pytestmark timeout(30) to all 6 test modules - Add test_empty_old_text_rejected, test_start_line_greater_than_end_line - Fix test assertions for new error messages and behavior - Add encoding="utf-8" to write_text calls in path validator tests Reviewed-by: CodeRabbit, Gemini, Greptile, Copilot Co-Authored-By: Claude Opus 4.6 --- DESIGN_SPEC.md | 2 +- .../tools/file_system/_base_fs_tool.py | 8 +-- .../tools/file_system/_path_validator.py | 19 +++++- src/ai_company/tools/file_system/edit_file.py | 44 +++++++++++-- .../tools/file_system/list_directory.py | 64 ++++++++++++++++--- src/ai_company/tools/file_system/read_file.py | 57 +++++++++++++---- .../tools/file_system/write_file.py | 32 +++++++++- .../tools/file_system/test_delete_file.py | 2 + .../unit/tools/file_system/test_edit_file.py | 14 ++++ .../tools/file_system/test_list_directory.py | 2 + .../tools/file_system/test_path_validator.py | 10 +-- .../unit/tools/file_system/test_read_file.py | 25 ++++++-- .../unit/tools/file_system/test_write_file.py | 2 + 13 files changed, 237 insertions(+), 44 deletions(-) diff --git a/DESIGN_SPEC.md b/DESIGN_SPEC.md index 9ee39e3196..f7ca0ee8e3 100644 --- a/DESIGN_SPEC.md +++ b/DESIGN_SPEC.md @@ -1624,7 +1624,7 @@ When coordination metrics collection is enabled, the system can optionally class | Category | Tools | Typical Roles | |----------|-------|---------------| -| **File System** | Read, write, edit, delete files | All developers, writers | +| **File System** | Read, write, edit, list, delete files | All developers, writers | | **Code Execution** | Run code in sandboxed environments | Developers, QA | | **Version Control** | Git operations, PR management | Developers, DevOps | | **Web** | HTTP requests, web scraping, search | Researchers, analysts | diff --git a/src/ai_company/tools/file_system/_base_fs_tool.py b/src/ai_company/tools/file_system/_base_fs_tool.py index 93ea5a752a..5e40626aae 100644 --- a/src/ai_company/tools/file_system/_base_fs_tool.py +++ b/src/ai_company/tools/file_system/_base_fs_tool.py @@ -8,15 +8,12 @@ from typing import TYPE_CHECKING, Any from ai_company.core.enums import ToolCategory -from ai_company.observability import get_logger from ai_company.tools.base import BaseTool from ai_company.tools.file_system._path_validator import PathValidator if TYPE_CHECKING: from pathlib import Path -logger = get_logger(__name__) - def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]: """Map an OS error to ``(log_key, user_message)`` for FS operations. @@ -24,7 +21,8 @@ def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]: Args: exc: The caught OS-level exception. user_path: The original user-supplied path string. - verb: Action verb for the fallback message (e.g. ``"reading"``). + verb: Action verb for the fallback message + (e.g. ``"reading"``, ``"editing"``). Returns: A two-tuple of (structured log key, human-readable message). @@ -35,7 +33,7 @@ def _map_os_error(exc: OSError, user_path: str, verb: str) -> tuple[str, str]: return "is_directory", f"Path is a directory, not a file: {user_path}" if isinstance(exc, PermissionError): return "permission_denied", f"Permission denied: {user_path}" - return str(exc), f"OS error {verb} file: {user_path}" + return "os_error", f"OS error {verb} file '{user_path}': {exc}" class BaseFileSystemTool(BaseTool, ABC): diff --git a/src/ai_company/tools/file_system/_path_validator.py b/src/ai_company/tools/file_system/_path_validator.py index f341f01027..a20b03032c 100644 --- a/src/ai_company/tools/file_system/_path_validator.py +++ b/src/ai_company/tools/file_system/_path_validator.py @@ -6,6 +6,7 @@ symlinks, or absolute paths outside the workspace. """ +from pathlib import PurePosixPath, PureWindowsPath from typing import TYPE_CHECKING from ai_company.observability import get_logger @@ -69,13 +70,29 @@ def validate(self, path: str) -> Path: Raises: ValueError: If the resolved path escapes the workspace. """ + # Reject absolute paths outright — agents must use relative paths. + if PurePosixPath(path).is_absolute() or PureWindowsPath(path).is_absolute(): + logger.warning(TOOL_FS_PATH_VIOLATION, user_path=path) + msg = f"Absolute paths not allowed: {path}" + raise ValueError(msg) + # NOTE: There is an inherent TOCTOU gap between this validation # and the actual file operation (which runs in asyncio.to_thread). # A concurrent process could swap in a symlink between validation # and use. Full mitigation requires OS-level sandboxing (e.g. # openat2 RESOLVE_BENEATH on Linux). User-space path validation # is a best-effort defence-in-depth layer. - resolved = (self._workspace_root / path).resolve() + try: + resolved = (self._workspace_root / path).resolve() + except OSError as exc: + logger.warning( + TOOL_FS_PATH_VIOLATION, + user_path=path, + error=str(exc), + ) + msg = f"Invalid path: {path} ({exc})" + raise ValueError(msg) from exc + if not resolved.is_relative_to(self._workspace_root): logger.warning( TOOL_FS_PATH_VIOLATION, diff --git a/src/ai_company/tools/file_system/edit_file.py b/src/ai_company/tools/file_system/edit_file.py index becdcaacd5..023695b465 100644 --- a/src/ai_company/tools/file_system/edit_file.py +++ b/src/ai_company/tools/file_system/edit_file.py @@ -1,6 +1,9 @@ """Edit file tool — search-and-replace within workspace files.""" import asyncio +import os +import pathlib +import tempfile from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger @@ -28,14 +31,40 @@ def _edit_sync(resolved: Path, old_text: str, new_text: str) -> int: """Perform search-and-replace synchronously. + Uses an atomic write pattern (temp file + replace) so that a crash + or disk-full during the write does not corrupt the original file. + + Args: + resolved: Resolved file path within the workspace. + old_text: Non-empty text to search for. + new_text: Replacement text (may be empty to delete). + Returns: Number of occurrences of *old_text* found in the file. + + Raises: + UnicodeDecodeError: If the file contains non-UTF-8 bytes. + FileNotFoundError: If the file does not exist. + PermissionError: If the process lacks read/write permission. + OSError: For other OS-level I/O failures. """ content = resolved.read_text(encoding="utf-8") count = content.count(old_text) if count > 0: new_content = content.replace(old_text, new_text, 1) - resolved.write_text(new_content, encoding="utf-8") + fd, tmp_path = tempfile.mkstemp( + dir=str(resolved.parent), + suffix=".tmp", + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(new_content) + fh.flush() + os.fsync(fh.fileno()) + pathlib.Path(tmp_path).replace(resolved) + except BaseException: + pathlib.Path(tmp_path).unlink(missing_ok=True) + raise return count @@ -84,6 +113,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, "old_text": { "type": "string", + "minLength": 1, "description": "Exact text to find", }, "new_text": { @@ -96,7 +126,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, ) - async def execute( # noqa: PLR0911 + async def execute( # noqa: C901, PLR0911 self, *, arguments: dict[str, Any], @@ -114,6 +144,12 @@ async def execute( # noqa: PLR0911 old_text: str = arguments["old_text"] new_text: str = arguments["new_text"] + if not old_text: + return ToolExecutionResult( + content="old_text cannot be empty", + is_error=True, + ) + if old_text == new_text: logger.debug( TOOL_FS_NOOP, @@ -187,10 +223,10 @@ async def execute( # noqa: PLR0911 return ToolExecutionResult(content=msg, is_error=True) if count == 0: - logger.info( + logger.warning( TOOL_FS_EDIT_NOT_FOUND, path=user_path, - old_text_preview=old_text[:100], + old_text_len=len(old_text), ) return ToolExecutionResult( content=f"Text not found in {user_path}.", diff --git a/src/ai_company/tools/file_system/list_directory.py b/src/ai_company/tools/file_system/list_directory.py index 7d794fb712..894985f94a 100644 --- a/src/ai_company/tools/file_system/list_directory.py +++ b/src/ai_company/tools/file_system/list_directory.py @@ -3,6 +3,7 @@ import asyncio import itertools import re +from pathlib import PurePosixPath, PureWindowsPath from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger @@ -33,7 +34,23 @@ def _list_sync( *, recursive: bool, ) -> list[str]: - """Collect directory entries synchronously.""" + """Collect directory entries synchronously. + + Args: + resolved: Resolved directory path within the workspace. + workspace_root: Workspace root for containment checks. + pattern: Optional glob filter (e.g. ``"*.py"``). + recursive: Whether to recurse into subdirectories. + + Returns: + A list of formatted strings with type prefixes + (``[DIR]``, ``[FILE]``, ``[SYMLINK]``, ``[ERROR]``). + Capped at ``MAX_ENTRIES + 1`` to allow truncation detection. + + Raises: + PermissionError: If the process lacks read permission. + OSError: For other OS-level I/O failures. + """ glob_pattern = pattern or "*" if recursive: raw_iter = resolved.rglob(glob_pattern) @@ -51,11 +68,19 @@ def _list_sync( try: display = str(entry.relative_to(resolved)) if recursive else entry.name + # Check containment for ALL entries (not just symlinks) + # to prevent leaking files via symlinked directories. + entry_resolved = entry.resolve() + if not entry_resolved.is_relative_to(workspace_root): + if entry.is_symlink(): + lines.append( + f"[SYMLINK] {display} -> (outside workspace)", + ) + continue + if entry.is_symlink(): - target = entry.resolve() - if not target.is_relative_to(workspace_root): - lines.append(f"[SYMLINK] {display} -> (outside workspace)") - continue + lines.append(f"[SYMLINK] {display}") + continue if entry.is_dir(): lines.append(f"[DIR] {display}/") @@ -77,7 +102,7 @@ def _list_sync( path=str(entry), error=str(exc), ) - lines.append(f"[ERROR] {entry.name}") + lines.append(f"[ERROR] {entry.name} ({exc})") return lines @@ -86,7 +111,8 @@ class ListDirectoryTool(BaseFileSystemTool): """Lists files and directories within the workspace. Supports optional glob filtering and recursive listing. Output is - sorted alphabetically with type prefixes (``[DIR]`` / ``[FILE]``). + sorted alphabetically with type prefixes (``[DIR]`` / ``[FILE]`` / + ``[SYMLINK]`` / ``[ERROR]``). Results are capped at ``MAX_ENTRIES`` (1000) entries to prevent excessive output. @@ -134,7 +160,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, ) - async def execute( + async def execute( # noqa: C901, PLR0911 self, *, arguments: dict[str, Any], @@ -163,11 +189,33 @@ async def execute( is_error=True, ) + # Reject absolute glob patterns — they cause + # NotImplementedError in pathlib. + if pattern and ( + PurePosixPath(pattern).is_absolute() + or PureWindowsPath(pattern).is_absolute() + ): + logger.warning( + TOOL_FS_GLOB_REJECTED, + pattern=pattern, + ) + return ToolExecutionResult( + content=f"Unsafe glob pattern rejected: {pattern}", + is_error=True, + ) + try: resolved = self.path_validator.validate(user_path) except ValueError as exc: return ToolExecutionResult(content=str(exc), is_error=True) + if not resolved.exists(): + logger.warning(TOOL_FS_ERROR, path=user_path, error="not_found") + return ToolExecutionResult( + content=f"Path not found: {user_path}", + is_error=True, + ) + if not resolved.is_dir(): logger.warning(TOOL_FS_ERROR, path=user_path, error="not_a_directory") return ToolExecutionResult( diff --git a/src/ai_company/tools/file_system/read_file.py b/src/ai_company/tools/file_system/read_file.py index 2f810cd23b..82c5a79555 100644 --- a/src/ai_company/tools/file_system/read_file.py +++ b/src/ai_company/tools/file_system/read_file.py @@ -37,9 +37,22 @@ def _read_sync( ) -> str: """Read file content synchronously, with optional line slicing. - When *max_bytes* is set and no line range is requested, only the - first *max_bytes* bytes are read from disk (avoiding full-file - materialisation for oversized files). + Args: + resolved: Resolved file path within the workspace. + start: First line to return (1-based inclusive), or ``None``. + end: Last line to return (1-based inclusive), or ``None``. + max_bytes: When set, only the first *max_bytes* characters are + read via text-mode ``read()`` (approximate byte cap for + oversized files without a line range). + + Returns: + The file content (possibly sliced or truncated). + + Raises: + UnicodeDecodeError: If the file contains non-UTF-8 bytes. + FileNotFoundError: If the file does not exist. + PermissionError: If the process lacks read permission. + OSError: For other OS-level I/O failures. """ if start is not None or end is not None: raw = resolved.read_text(encoding="utf-8") @@ -106,7 +119,7 @@ def __init__(self, *, workspace_root: Path) -> None: }, ) - async def execute( + async def execute( # noqa: PLR0911 self, *, arguments: dict[str, Any], @@ -124,6 +137,15 @@ async def execute( start_line: int | None = arguments.get("start_line") end_line: int | None = arguments.get("end_line") + if start_line is not None and end_line is not None and start_line > end_line: + return ToolExecutionResult( + content=( + f"Invalid line range: start_line ({start_line}) " + f"must be <= end_line ({end_line})" + ), + is_error=True, + ) + try: resolved = self.path_validator.validate(user_path) except ValueError as exc: @@ -156,10 +178,19 @@ async def execute( size_bytes=size_bytes, max_bytes=MAX_FILE_SIZE_BYTES, ) - - # When oversized and no line range, read only MAX_FILE_SIZE_BYTES - # to avoid loading the whole file into memory. - max_bytes = MAX_FILE_SIZE_BYTES if oversized and not has_line_range else None + if has_line_range: + return ToolExecutionResult( + content=( + f"File too large for line-range read: " + f"{user_path} ({size_bytes:,} bytes, " + f"max {MAX_FILE_SIZE_BYTES:,})" + ), + is_error=True, + ) + + # When oversized, read only MAX_FILE_SIZE_BYTES to avoid + # loading the whole file into memory. + max_bytes = MAX_FILE_SIZE_BYTES if oversized else None try: content = await asyncio.to_thread( @@ -180,16 +211,16 @@ async def execute( logger.warning(TOOL_FS_ERROR, path=user_path, error=log_key) return ToolExecutionResult(content=msg, is_error=True) - if oversized and not has_line_range: + line_count = content.count("\n") + ( + 1 if content and not content.endswith("\n") else 0 + ) + + if oversized: content += ( f"\n\n[Truncated: file is {size_bytes:,} bytes, " f"showing first {MAX_FILE_SIZE_BYTES:,}]" ) - line_count = content.count("\n") + ( - 1 if content and not content.endswith("\n") else 0 - ) - logger.info( TOOL_FS_READ, path=user_path, diff --git a/src/ai_company/tools/file_system/write_file.py b/src/ai_company/tools/file_system/write_file.py index 3a1255848d..9616f8573d 100644 --- a/src/ai_company/tools/file_system/write_file.py +++ b/src/ai_company/tools/file_system/write_file.py @@ -1,6 +1,9 @@ """Write file tool — creates or overwrites files in the workspace.""" import asyncio +import os +import pathlib +import tempfile from typing import TYPE_CHECKING, Any, Final from ai_company.observability import get_logger @@ -23,14 +26,41 @@ def _write_sync(resolved: Path, content: str, *, create_dirs: bool) -> tuple[int, bool]: """Write content to file synchronously. + Uses an atomic write pattern (temp file + replace) so that a crash + or disk-full during the write does not corrupt an existing file. + + Args: + resolved: Resolved file path within the workspace. + content: Text content to write. + create_dirs: Whether to create parent directories. + Returns: Tuple of (bytes_written, created) where *created* is True if the file did not exist before the write. + + Raises: + IsADirectoryError: If the target is a directory. + PermissionError: If the process lacks write permission. + OSError: For other OS-level I/O failures. """ created = not resolved.exists() if create_dirs: resolved.parent.mkdir(parents=True, exist_ok=True) - resolved.write_text(content, encoding="utf-8") + + fd, tmp_path = tempfile.mkstemp( + dir=str(resolved.parent), + suffix=".tmp", + ) + try: + with os.fdopen(fd, "w", encoding="utf-8") as fh: + fh.write(content) + fh.flush() + os.fsync(fh.fileno()) + pathlib.Path(tmp_path).replace(resolved) + except BaseException: + pathlib.Path(tmp_path).unlink(missing_ok=True) + raise + return resolved.stat().st_size, created diff --git a/tests/unit/tools/file_system/test_delete_file.py b/tests/unit/tools/file_system/test_delete_file.py index 5ff6c5cf89..980b4000be 100644 --- a/tests/unit/tools/file_system/test_delete_file.py +++ b/tests/unit/tools/file_system/test_delete_file.py @@ -9,6 +9,8 @@ from ai_company.tools.file_system.delete_file import DeleteFileTool +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestDeleteFileProperties: diff --git a/tests/unit/tools/file_system/test_edit_file.py b/tests/unit/tools/file_system/test_edit_file.py index 79523ff1f6..525e6bf6c0 100644 --- a/tests/unit/tools/file_system/test_edit_file.py +++ b/tests/unit/tools/file_system/test_edit_file.py @@ -9,6 +9,8 @@ from ai_company.tools.file_system.edit_file import EditFileTool +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestEditFileExecution: @@ -147,6 +149,18 @@ async def test_edit_directory_errors(self, edit_tool: EditFileTool) -> None: assert result.is_error assert "directory" in result.content.lower() + async def test_empty_old_text_rejected(self, edit_tool: EditFileTool) -> None: + """Empty old_text must be rejected.""" + result = await edit_tool.execute( + arguments={ + "path": "hello.txt", + "old_text": "", + "new_text": "injected", + } + ) + assert result.is_error + assert "empty" in result.content.lower() + async def test_edit_large_file_rejected( self, workspace: Path, edit_tool: EditFileTool ) -> None: diff --git a/tests/unit/tools/file_system/test_list_directory.py b/tests/unit/tools/file_system/test_list_directory.py index 69ff38a58b..0f2cb41f36 100644 --- a/tests/unit/tools/file_system/test_list_directory.py +++ b/tests/unit/tools/file_system/test_list_directory.py @@ -14,6 +14,8 @@ if TYPE_CHECKING: from pathlib import Path +pytestmark = pytest.mark.timeout(30) + _SYMLINK_SUPPORTED = not (sys.platform == "win32" and not os.environ.get("CI")) diff --git a/tests/unit/tools/file_system/test_path_validator.py b/tests/unit/tools/file_system/test_path_validator.py index 430117b357..8072ca98a3 100644 --- a/tests/unit/tools/file_system/test_path_validator.py +++ b/tests/unit/tools/file_system/test_path_validator.py @@ -11,6 +11,8 @@ if TYPE_CHECKING: from pathlib import Path +pytestmark = pytest.mark.timeout(30) + _SYMLINK_SUPPORTED = not (sys.platform == "win32" and not os.environ.get("CI")) @@ -28,7 +30,7 @@ def test_nonexistent_workspace_raises(self, tmp_path: Path) -> None: def test_file_as_workspace_raises(self, tmp_path: Path) -> None: f = tmp_path / "file.txt" - f.write_text("x") + f.write_text("x", encoding="utf-8") with pytest.raises(ValueError, match="not an existing directory"): PathValidator(f) @@ -38,7 +40,7 @@ class TestValidate: """Path validation tests.""" def test_relative_path(self, tmp_path: Path) -> None: - (tmp_path / "a.txt").write_text("x") + (tmp_path / "a.txt").write_text("x", encoding="utf-8") pv = PathValidator(tmp_path) result = pv.validate("a.txt") assert result == (tmp_path / "a.txt").resolve() @@ -46,7 +48,7 @@ def test_relative_path(self, tmp_path: Path) -> None: def test_nested_path(self, tmp_path: Path) -> None: sub = tmp_path / "d" sub.mkdir() - (sub / "b.txt").write_text("x") + (sub / "b.txt").write_text("x", encoding="utf-8") pv = PathValidator(tmp_path) result = pv.validate("d/b.txt") assert result == (sub / "b.txt").resolve() @@ -58,7 +60,7 @@ def test_traversal_rejected(self, tmp_path: Path) -> None: def test_absolute_outside_rejected(self, tmp_path: Path) -> None: pv = PathValidator(tmp_path) - with pytest.raises(ValueError, match="escapes workspace"): + with pytest.raises(ValueError, match="Absolute paths not allowed"): pv.validate("/etc/passwd") def test_dot_path(self, tmp_path: Path) -> None: diff --git a/tests/unit/tools/file_system/test_read_file.py b/tests/unit/tools/file_system/test_read_file.py index 454a75ef94..052baef996 100644 --- a/tests/unit/tools/file_system/test_read_file.py +++ b/tests/unit/tools/file_system/test_read_file.py @@ -10,6 +10,8 @@ if TYPE_CHECKING: from pathlib import Path +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestReadFileToolProperties: @@ -71,8 +73,7 @@ async def test_read_start_line_only( arguments={"path": "multi.txt", "start_line": 2} ) assert not result.is_error - assert "b" in result.content - assert "c" in result.content + assert result.content == "b\nc\n" async def test_read_end_line_only( self, workspace: Path, read_tool: ReadFileTool @@ -117,16 +118,26 @@ async def test_large_file_truncated( assert "Truncated" in result.content assert len(result.content) < len(big_content) - async def test_large_file_not_truncated_with_line_range( + async def test_large_file_rejects_line_range( self, workspace: Path, read_tool: ReadFileTool ) -> None: - """When start_line/end_line is given, truncation is bypassed.""" + """Oversized files with line ranges are rejected to prevent DoS.""" lines = ["line\n"] * (MAX_FILE_SIZE_BYTES // 4) big_content = "".join(lines) (workspace / "big_lines.txt").write_text(big_content, encoding="utf-8") result = await read_tool.execute( arguments={"path": "big_lines.txt", "start_line": 1, "end_line": 5} ) - assert not result.is_error - assert "Truncated" not in result.content - assert "line" in result.content + assert result.is_error + assert "too large" in result.content.lower() + + async def test_start_line_greater_than_end_line( + self, workspace: Path, read_tool: ReadFileTool + ) -> None: + """start_line > end_line returns an error.""" + (workspace / "multi.txt").write_text("a\nb\nc\n", encoding="utf-8") + result = await read_tool.execute( + arguments={"path": "multi.txt", "start_line": 3, "end_line": 1} + ) + assert result.is_error + assert "start_line" in result.content diff --git a/tests/unit/tools/file_system/test_write_file.py b/tests/unit/tools/file_system/test_write_file.py index 0b54c73d15..712eedea4c 100644 --- a/tests/unit/tools/file_system/test_write_file.py +++ b/tests/unit/tools/file_system/test_write_file.py @@ -9,6 +9,8 @@ from ai_company.tools.file_system.write_file import WriteFileTool +pytestmark = pytest.mark.timeout(30) + @pytest.mark.unit class TestWriteFileExecution: