Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import shutil
import threading
import time
from collections.abc import Callable, Sequence
from collections.abc import Callable, Iterator, Sequence
from concurrent.futures import ThreadPoolExecutor
from contextlib import suppress
from copy import copy
Expand Down Expand Up @@ -483,15 +483,57 @@ def _display_mount_path(mount_path: str) -> str:
return f"/input/{mount_path}"


def _iter_real_entries(root: Path) -> Iterator[Path]:
"""Walk ``root`` recursively, yielding directories and regular files only.

``Path.rglob`` follows directory symlinks by default, which combined with
``Path.is_file()`` / ``shutil.copy2`` (all follow symlinks) would expose
paths outside the configured input tree if the source tree is
attacker-controlled. This walker mirrors the safe behaviour by checking
``is_symlink()`` at every directory level and never descending through one.

Non-regular files (sockets, FIFOs, devices) are also filtered out so the
signature mirrors exactly what ``_copy_path`` actually stages.
"""
stack: list[Path] = [root]
while stack:
current = stack.pop()
try:
children = list(current.iterdir())
except OSError:
continue
for child in children:
try:
if child.is_symlink():
continue
if child.is_dir():
stack.append(child)
yield child
elif child.is_file():
yield child
# Non-regular files (sockets/FIFOs/devices) are skipped to
# match ``_copy_path``'s staging behaviour.
except OSError:
Comment thread
eavanvalkenburg marked this conversation as resolved.
continue


def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]:
"""Return a stable signature of the real (non-symlink) file tree under ``path``.

Uses ``lstat()`` and skips symlinks so the signature reflects only entries
that physically live inside the tree. This keeps the cache key consistent
with what ``_copy_path`` actually stages.
"""
if path.is_symlink():
Comment thread
eavanvalkenburg marked this conversation as resolved.
return ()
if path.is_file():
stat = path.stat()
stat = path.lstat()
return ((path.name, int(stat.st_size), int(stat.st_mtime_ns)),)

entries: list[tuple[str, int, int]] = []
for candidate in sorted(path.rglob("*"), key=lambda value: value.as_posix()):
for candidate in sorted(_iter_real_entries(path), key=lambda value: value.as_posix()):
try:
stat = candidate.stat()
stat = candidate.lstat()
except FileNotFoundError:
continue
relative_path = candidate.relative_to(path).as_posix()
Expand All @@ -501,14 +543,31 @@ def _path_tree_signature(path: Path) -> tuple[tuple[str, int, int], ...]:


def _copy_path(source: Path, destination: Path) -> None:
"""Stage ``source`` into ``destination`` without following symlinks.

Symlinks (file or directory) are skipped entirely so a sandbox input tree
can only contain real entries that physically live under the configured
``workspace_root`` or a ``file_mounts`` host path. ``Path.is_dir()``,
``Path.is_file()`` and ``shutil.copy2`` all follow symlinks by default,
which is unsafe when the source tree may be attacker-controlled.
"""
# Detect symlinks before doing anything else - ``is_symlink()`` does not
# follow the link, unlike ``is_dir()`` / ``is_file()``.
if source.is_symlink():
Comment thread
eavanvalkenburg marked this conversation as resolved.
return

if source.is_dir():
destination.mkdir(parents=True, exist_ok=True)
for child in sorted(source.iterdir(), key=lambda value: value.name):
_copy_path(child, destination / child.name)
return

if not source.is_file():
# Non-regular files (sockets, FIFOs, devices) are intentionally skipped.
return

destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, destination)
shutil.copy2(source, destination, follow_symlinks=False)


def _populate_input_dir(*, config: _RunConfig, input_root: Path) -> None:
Expand Down
138 changes: 138 additions & 0 deletions python/packages/hyperlight/tests/hyperlight/test_hyperlight_codeact.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,144 @@ async def test_execute_code_tool_populates_input_dir_with_workspace_and_file_mou
assert (input_root / "data" / "input.txt").read_text(encoding="utf-8") == "hello from mount"


def _build_run_config(
*,
workspace_root: Path | None = None,
file_mounts: tuple = (),
) -> Any:
"""Build a minimal _RunConfig for tests that exercise _populate_input_dir directly."""
return execute_code_module._RunConfig(
backend="wasm",
module="python_guest.path",
module_path=None,
approval_mode="never_require",
tools=(),
workspace_root=workspace_root,
workspace_signature=(),
file_mounts=file_mounts,
allowed_domains=(),
)


def _symlinks_supported(tmp: Path) -> bool:
"""Return True if the current platform/environment supports symlinks.

Mirrors python/packages/core/tests/core/test_skills.py so the symlink
regression tests are skipped on restricted Windows CI runners instead of
failing on ``OSError`` / ``NotImplementedError`` during creation.
"""
test_target = tmp / "_symlink_test_target"
test_link = tmp / "_symlink_test_link"
try:
test_target.write_text("test", encoding="utf-8")
test_link.symlink_to(test_target)
return True
except (OSError, NotImplementedError):
return False
finally:
test_link.unlink(missing_ok=True)
test_target.unlink(missing_ok=True)


def test_populate_input_dir_skips_symlink_to_file_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real.txt").write_text("real-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)

Comment thread
eavanvalkenburg marked this conversation as resolved.
input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

# Real file copied; symlink and its target are absent.
assert (input_root / "real.txt").read_text(encoding="utf-8") == "real-content"
assert not (input_root / "link.txt").exists()
assert not (input_root / "link.txt").is_symlink()
# Sanity: no outside-content anywhere in the input tree.
leaked = [
path
for path in input_root.rglob("*")
if path.is_file() and path.read_text(encoding="utf-8") == "outside-content"
]
assert leaked == []


def test_populate_input_dir_skips_symlinked_directory_outside_workspace(tmp_path: Path) -> None:
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
outside_dir = tmp_path / "outside_dir"
outside_dir.mkdir()
(outside_dir / "deep.txt").write_text("deep-content", encoding="utf-8")
(workspace / "linked_dir").symlink_to(outside_dir, target_is_directory=True)

input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

# Neither the symlink itself nor anything under the symlinked target leaks.
assert not (input_root / "linked_dir").exists()
leaked = [
path for path in input_root.rglob("*") if path.is_file() and path.read_text(encoding="utf-8") == "deep-content"
]
assert leaked == []


def test_populate_input_dir_skips_nested_symlinks(tmp_path: Path) -> None:
"""A symlink several levels deep inside a real subdir must also be skipped."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
(workspace / "real_sub").mkdir(parents=True)
(workspace / "real_sub" / "ok.txt").write_text("ok", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "real_sub" / "link.txt").symlink_to(outside)

input_root = tmp_path / "input"
input_root.mkdir()

execute_code_module._populate_input_dir(
config=_build_run_config(workspace_root=workspace),
input_root=input_root,
)

assert (input_root / "real_sub" / "ok.txt").read_text(encoding="utf-8") == "ok"
assert not (input_root / "real_sub" / "link.txt").exists()


def test_path_tree_signature_does_not_follow_symlinks(tmp_path: Path) -> None:
"""The cache-key signature must reflect only real files (mirrors the staged tree)."""
if not _symlinks_supported(tmp_path):
pytest.skip("Symlinks not supported on this platform/environment")
workspace = tmp_path / "workspace"
workspace.mkdir()
real = workspace / "real.txt"
real.write_text("real-content", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside-content", encoding="utf-8")
(workspace / "link.txt").symlink_to(outside)

signature = execute_code_module._path_tree_signature(workspace)

names = [entry[0] for entry in signature]
assert "real.txt" in names
assert "link.txt" not in names


def test_execute_code_tool_allowed_domains_use_structured_entries_and_replace_by_target() -> None:
execute_code = HyperlightExecuteCodeTool(_registry=_FakeRuntime())

Expand Down
Loading