From 9726a5495f9d5fbc0f1bea99659685453d47e86e Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Wed, 13 May 2026 12:13:55 +0100 Subject: [PATCH 1/3] Align Python FileSkillsSource with agentskills.io spec Update FileSkillsSource to scan spec-defined subdirectories instead of recursive rglob for resource and script discovery: - Resources: scan 'references/' and 'assets/' (was: entire skill tree) - Scripts: scan 'scripts/' (was: entire skill tree) - Add resource_directories and script_directories parameters for customization, with '.' root indicator for skill root files - Add directory validation: reject '..' traversal, absolute paths, empty names; normalize separators and deduplicate directories - Non-recursive scanning within each configured directory (top-level only) - Containment check validates files against target directory, not just skill root, for stronger path-traversal defense - Case-insensitive directory deduplication via os.path.normcase() - Cross-platform absolute path rejection in directory validation - Sort discovery results for stable ordering - Update SkillsProvider.from_paths() to pass new parameters through - Update all tests for new subdirectory-scoped discovery behavior Resolves #5711. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/_skills.py | 288 +++++++++-- .../packages/core/tests/core/test_skills.py | 475 ++++++++++++++---- 2 files changed, 606 insertions(+), 157 deletions(-) diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index 44755a2efd..b2c93d2248 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -1357,6 +1357,13 @@ def __call__(self, skill: FileSkill, script: FileSkillScript, args: dict[str, An ) DEFAULT_SCRIPT_EXTENSIONS: Final[tuple[str, ...]] = (".py",) +# "." means the skill directory root itself (files directly in the skill folder). +ROOT_DIRECTORY_INDICATOR: Final[str] = "." + +# Standard subdirectory names per https://agentskills.io/specification#directory-structure +DEFAULT_RESOURCE_DIRECTORIES: Final[tuple[str, ...]] = ("references", "assets") +DEFAULT_SCRIPT_DIRECTORIES: Final[tuple[str, ...]] = ("scripts",) + # region Patterns and prompt template # Matches YAML frontmatter delimited by "---" lines. @@ -1562,6 +1569,8 @@ def from_paths( script_runner: SkillScriptRunner | None = None, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, + resource_directories: Sequence[str] | None = None, + script_directories: Sequence[str] | None = None, instruction_template: str | None = None, require_script_approval: bool = False, disable_caching: bool = False, @@ -1584,6 +1593,15 @@ def from_paths( ``(".md", ".json", ".yaml", ".yml", ".csv", ".xml", ".txt")``. script_extensions: File extensions recognized as discoverable scripts. Defaults to ``(".py",)``. + resource_directories: Relative directory paths to scan for + resource files within each skill directory. Use ``"."`` + to include files at the skill root level. Defaults to + ``("references", "assets")`` per the agentskills.io + specification. + script_directories: Relative directory paths to scan for + script files within each skill directory. Use ``"."`` + to include files at the skill root level. Defaults to + ``("scripts",)`` per the agentskills.io specification. instruction_template: Custom system-prompt template for advertising skills. Must contain a ``{skills}`` placeholder. Uses a built-in template when ``None``. @@ -1613,6 +1631,8 @@ def from_paths( script_runner=script_runner, resource_extensions=resource_extensions, script_extensions=script_extensions, + resource_directories=resource_directories, + script_directories=script_directories, ) ) return cls( @@ -2098,7 +2118,15 @@ class FileSkillsSource(SkillsSource): Recursively scans the configured *skill_paths* directories for ``SKILL.md`` files (up to 2 levels deep), parses their YAML frontmatter, - and discovers associated resource and script files from subdirectories. + and discovers associated resource and script files from spec-defined + subdirectories. + + By default, resources are discovered from ``references/`` and ``assets/`` + subdirectories, and scripts from ``scripts/``, per the + `agentskills.io specification + `_. Use *resource_directories* + and *script_directories* to customize which subdirectories are scanned. + Pass ``"."`` to include files at the skill root level. Security: file-based metadata is XML-escaped before prompt injection, and resource reads are guarded against path traversal and symlink escape. @@ -2112,14 +2140,15 @@ class FileSkillsSource(SkillsSource): source = FileSkillsSource(skill_paths="./skills") skills = await source.get_skills() - With a script runner and custom extensions: + With a script runner and custom directories: .. code-block:: python source = FileSkillsSource( skill_paths=["./skills", "./more-skills"], script_runner=my_runner, - script_extensions=(".py", ".sh"), + resource_directories=[".", "references", "assets"], + script_directories=["scripts"], ) """ @@ -2130,6 +2159,8 @@ def __init__( script_runner: SkillScriptRunner | None = None, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, + resource_directories: Sequence[str] | None = None, + script_directories: Sequence[str] | None = None, ) -> None: """Initialize a FileSkillsSource. @@ -2149,6 +2180,18 @@ def __init__( ``(".md", ".json", ".yaml", ".yml", ".csv", ".xml", ".txt")``. script_extensions: File extensions recognized as discoverable scripts. Defaults to ``(".py",)``. + resource_directories: Relative directory paths to scan for + resource files within each skill directory. Use ``"."`` + to include files at the skill root level. Defaults to + ``("references", "assets")`` per the + `agentskills.io specification + `_. + script_directories: Relative directory paths to scan for + script files within each skill directory. Use ``"."`` + to include files at the skill root level. Defaults to + ``("scripts",)`` per the + `agentskills.io specification + `_. """ if isinstance(skill_paths, (str, Path)): self._skill_paths: list[str] = [str(skill_paths)] @@ -2159,6 +2202,17 @@ def __init__( self._resource_extensions = resource_extensions or DEFAULT_RESOURCE_EXTENSIONS self._script_extensions = script_extensions or DEFAULT_SCRIPT_EXTENSIONS + self._resource_directories: tuple[str, ...] = ( + tuple(FileSkillsSource._validate_and_normalize_directory_names(resource_directories)) + if resource_directories is not None + else DEFAULT_RESOURCE_DIRECTORIES + ) + self._script_directories: tuple[str, ...] = ( + tuple(FileSkillsSource._validate_and_normalize_directory_names(script_directories)) + if script_directories is not None + else DEFAULT_SCRIPT_DIRECTORIES + ) + async def get_skills(self) -> list[Skill]: """Discover and return all file-based skills from configured paths. @@ -2197,12 +2251,16 @@ async def get_skills(self) -> list[Skill]: ) # Discover and attach file-based resources - for rn in FileSkillsSource._discover_resource_files(skill_path, self._resource_extensions): + for rn in FileSkillsSource._discover_resource_files( + skill_path, self._resource_extensions, self._resource_directories + ): resource_full_path = FileSkillsSource._get_validated_resource_path(skill_path, rn) file_skill.resources.append(_FileSkillResource(name=rn, full_path=resource_full_path)) # Discover and attach file-based scripts as SkillScript instances - for sn in FileSkillsSource._discover_script_files(skill_path, self._script_extensions): + for sn in FileSkillsSource._discover_script_files( + skill_path, self._script_extensions, self._script_directories + ): script_full_path = os.path.normpath(os.path.join(skill_path, sn)) # noqa: ASYNC240 file_skill.scripts.append( FileSkillScript(name=sn, full_path=script_full_path, runner=self._script_runner) @@ -2282,116 +2340,246 @@ def _has_symlink_in_path(path: str, directory: str) -> bool: return True return False + @staticmethod + def _validate_and_normalize_directory_names( + directories: Sequence[str], + ) -> list[str]: + """Validate and normalize relative directory names. + + Ensures each entry is a safe relative path. The ``"."`` root indicator + is passed through unchanged. Entries containing ``..`` segments or + representing absolute paths are rejected with a warning and skipped. + Empty or whitespace-only entries raise :class:`ValueError`. + + Args: + directories: Sequence of relative directory names to validate. + + Returns: + A list of validated, normalized directory names. + + Raises: + ValueError: If any entry is empty or whitespace-only. + """ + result: list[str] = [] + for directory in directories: + if not directory or not directory.strip(): + raise ValueError("Directory names must not be empty or whitespace.") + + # Normalize separators: backslash → forward slash, strip leading ./ and trailing / + normalized = PurePosixPath(directory.replace("\\", "/")).as_posix() + + # "." and "./" both normalize to "." — treat as root indicator + if normalized == ROOT_DIRECTORY_INDICATOR: + result.append(ROOT_DIRECTORY_INDICATOR) + continue + + # Reject absolute paths (check both POSIX and Windows-style roots + # so validation is consistent regardless of the host OS) + if ( + os.path.isabs(directory) + or normalized.startswith("/") + or (len(normalized) >= 2 and normalized[1] == ":") + ): + logger.warning( + "Skipping directory '%s': absolute paths are not allowed.", + directory, + ) + continue + + # Reject paths containing ".." segments + if any(segment == ".." for segment in normalized.split("/")): + logger.warning( + "Skipping directory '%s': parent traversal ('..') is not allowed.", + directory, + ) + continue + + result.append(normalized) + return result + @staticmethod def _discover_resource_files( skill_dir_path: str, extensions: tuple[str, ...] = DEFAULT_RESOURCE_EXTENSIONS, + directories: tuple[str, ...] = DEFAULT_RESOURCE_DIRECTORIES, ) -> list[str]: - """Scan a skill directory for resource files matching *extensions*. + """Scan configured subdirectories for resource files matching *extensions*. - Recursively walks *skill_dir_path* and collects files whose extension - is in *extensions*, excluding ``SKILL.md`` itself. Each candidate is - validated against path-traversal and symlink-escape checks; unsafe - files are skipped with a warning. + Scans each directory in *directories* within *skill_dir_path* for files + whose extension is in *extensions*, excluding ``SKILL.md`` itself. + Use ``"."`` in *directories* to include files at the skill root level. + Each candidate is validated against path-traversal and symlink-escape + checks; unsafe files are skipped with a warning. Args: skill_dir_path: Absolute path to the skill directory to scan. extensions: Tuple of allowed file extensions (e.g. ``(".md", ".json")``). + directories: Relative subdirectory paths to scan for resources. Returns: - Relative resource paths (forward-slash-separated) for every + Sorted relative resource paths (forward-slash-separated) for every discovered file that passes security checks. """ skill_dir = Path(skill_dir_path).absolute() root_directory_path = str(skill_dir) resources: list[str] = [] normalized_extensions = {e.lower() for e in extensions} + seen_directories: set[str] = set() - for resource_file in skill_dir.rglob("*"): - if not resource_file.is_file(): - continue + for directory in directories: + is_root = directory == ROOT_DIRECTORY_INDICATOR + target_dir = skill_dir if is_root else (skill_dir / directory) - if resource_file.name.upper() == SKILL_FILE_NAME.upper(): + # Deduplicate after resolving to avoid scanning the same directory twice. + # Use normcase for case-insensitive dedup on case-insensitive filesystems. + resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) + dedup_key = os.path.normcase(resolved_target) + if dedup_key in seen_directories: continue + seen_directories.add(dedup_key) - if resource_file.suffix.lower() not in normalized_extensions: + if not target_dir.is_dir(): continue - resource_full_path = str(Path(os.path.normpath(resource_file)).absolute()) - - if not FileSkillsSource._is_path_within_directory(resource_full_path, root_directory_path): + # Directory-level symlink check for non-root directories + if not is_root and FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): logger.warning( - "Skipping resource '%s': resolves outside skill directory '%s'", - resource_file, + "Skipping resource directory '%s': symlink detected in path under skill directory '%s'", + directory, skill_dir_path, ) continue - if FileSkillsSource._has_symlink_in_path(resource_full_path, root_directory_path): - logger.warning( - "Skipping resource '%s': symlink detected in path under skill directory '%s'", - resource_file, - skill_dir_path, - ) + # Scan top-level files only (non-recursive) within this directory + try: + entries = list(target_dir.iterdir()) + except OSError: continue - rel_path = resource_file.relative_to(skill_dir) - resources.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + for resource_file in entries: + if not resource_file.is_file(): + continue + + if resource_file.name.upper() == SKILL_FILE_NAME.upper(): + continue + + if resource_file.suffix.lower() not in normalized_extensions: + continue + + resource_full_path = str(Path(os.path.normpath(resource_file)).absolute()) + + # Containment check: file must resolve within the target directory + if not FileSkillsSource._is_path_within_directory(resource_full_path, resolved_target): + logger.warning( + "Skipping resource '%s': resolves outside target directory '%s'", + resource_file, + directory, + ) + continue + + if FileSkillsSource._has_symlink_in_path(resource_full_path, root_directory_path): + logger.warning( + "Skipping resource '%s': symlink detected in path under skill directory '%s'", + resource_file, + skill_dir_path, + ) + continue + rel_path = resource_file.relative_to(skill_dir) + resources.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + + resources.sort() return resources @staticmethod def _discover_script_files( skill_dir_path: str, extensions: tuple[str, ...] = DEFAULT_SCRIPT_EXTENSIONS, + directories: tuple[str, ...] = DEFAULT_SCRIPT_DIRECTORIES, ) -> list[str]: - """Scan a skill directory for script files matching *extensions*. + """Scan configured subdirectories for script files matching *extensions*. - Recursively walks *skill_dir_path* and collects files whose extension - is in *extensions*. Each candidate is validated against path-traversal - and symlink-escape checks; unsafe files are skipped with a warning. + Scans each directory in *directories* within *skill_dir_path* for files + whose extension is in *extensions*. Use ``"."`` in *directories* to + include files at the skill root level. Each candidate is validated + against path-traversal and symlink-escape checks; unsafe files are + skipped with a warning. Args: skill_dir_path: Absolute path to the skill directory to scan. extensions: Tuple of allowed script extensions (e.g. ``(".py",)``). + directories: Relative subdirectory paths to scan for scripts. Returns: - Relative script paths (forward-slash-separated) for every + Sorted relative script paths (forward-slash-separated) for every discovered file that passes security checks. """ skill_dir = Path(skill_dir_path).absolute() root_directory_path = str(skill_dir) scripts: list[str] = [] normalized_extensions = {e.lower() for e in extensions} + seen_directories: set[str] = set() - for script_file in skill_dir.rglob("*"): - if not script_file.is_file(): - continue + for directory in directories: + is_root = directory == ROOT_DIRECTORY_INDICATOR + target_dir = skill_dir if is_root else (skill_dir / directory) - if script_file.suffix.lower() not in normalized_extensions: + # Deduplicate after resolving to avoid scanning the same directory twice. + # Use normcase for case-insensitive dedup on case-insensitive filesystems. + resolved_target = str(Path(os.path.normpath(target_dir)).absolute()) + dedup_key = os.path.normcase(resolved_target) + if dedup_key in seen_directories: continue + seen_directories.add(dedup_key) - script_full_path = str(Path(os.path.normpath(script_file)).absolute()) + if not target_dir.is_dir(): + continue - if not FileSkillsSource._is_path_within_directory(script_full_path, root_directory_path): + # Directory-level symlink check for non-root directories + if not is_root and FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): logger.warning( - "Skipping script '%s': resolves outside skill directory '%s'", - script_file, + "Skipping script directory '%s': symlink detected in path under skill directory '%s'", + directory, skill_dir_path, ) continue - if FileSkillsSource._has_symlink_in_path(script_full_path, root_directory_path): - logger.warning( - "Skipping script '%s': symlink detected in path under skill directory '%s'", - script_file, - skill_dir_path, - ) + # Scan top-level files only (non-recursive) within this directory + try: + entries = list(target_dir.iterdir()) + except OSError: continue - rel_path = script_file.relative_to(skill_dir) - scripts.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + for script_file in entries: + if not script_file.is_file(): + continue + + if script_file.suffix.lower() not in normalized_extensions: + continue + + script_full_path = str(Path(os.path.normpath(script_file)).absolute()) + + # Containment check: file must resolve within the target directory + if not FileSkillsSource._is_path_within_directory(script_full_path, resolved_target): + logger.warning( + "Skipping script '%s': resolves outside target directory '%s'", + script_file, + directory, + ) + continue + + if FileSkillsSource._has_symlink_in_path(script_full_path, root_directory_path): + logger.warning( + "Skipping script '%s': symlink detected in path under skill directory '%s'", + script_file, + skill_dir_path, + ) + continue + + rel_path = script_file.relative_to(skill_dir) + scripts.append(FileSkillsSource._normalize_resource_path(str(rel_path))) + scripts.sort() return scripts @staticmethod diff --git a/python/packages/core/tests/core/test_skills.py b/python/packages/core/tests/core/test_skills.py index b268b31551..2dbc9b9363 100644 --- a/python/packages/core/tests/core/test_skills.py +++ b/python/packages/core/tests/core/test_skills.py @@ -30,8 +30,11 @@ SkillsProvider, ) from agent_framework._skills import ( + DEFAULT_RESOURCE_DIRECTORIES, DEFAULT_RESOURCE_EXTENSIONS, + DEFAULT_SCRIPT_DIRECTORIES, DEFAULT_SCRIPT_EXTENSIONS, + ROOT_DIRECTORY_INDICATOR, InlineSkillResource, InlineSkillScript, _create_resource_element, @@ -143,6 +146,8 @@ async def _discover_file_skills_for_test( *, resource_extensions: tuple[str, ...] | None = None, script_extensions: tuple[str, ...] | None = None, + resource_directories: Sequence[str] | None = None, + script_directories: Sequence[str] | None = None, script_runner: Any = None, ) -> dict[str, FileSkill]: """Test helper: discover file skills and return as a dict keyed by name. @@ -155,6 +160,10 @@ async def _discover_file_skills_for_test( kwargs["resource_extensions"] = resource_extensions if script_extensions is not None: kwargs["script_extensions"] = script_extensions + if resource_directories is not None: + kwargs["resource_directories"] = resource_directories + if script_directories is not None: + kwargs["script_directories"] = script_directories if script_runner is not None: kwargs["script_runner"] = script_runner @@ -191,59 +200,103 @@ def test_no_change_for_clean_path(self) -> None: class TestDiscoverResourceFiles: """Tests for _discover_resource_files (filesystem-based resource discovery).""" - def test_discovers_md_files(self, tmp_path: Path) -> None: + def test_discovers_md_files_in_references(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text("---\nname: s\ndescription: d\n---\n", encoding="utf-8") - refs = skill_dir / "refs" + refs = skill_dir / "references" refs.mkdir() (refs / "FAQ.md").write_text("FAQ content", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert "refs/FAQ.md" in resources + assert "references/FAQ.md" in resources - def test_excludes_skill_md(self, tmp_path: Path) -> None: + def test_discovers_md_files_in_assets(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" skill_dir.mkdir() - (skill_dir / "SKILL.md").write_text("content", encoding="utf-8") + assets = skill_dir / "assets" + assets.mkdir() + (assets / "guide.md").write_text("guide", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + assert "assets/guide.md" in resources + + def test_excludes_skill_md_at_root(self, tmp_path: Path) -> None: + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text("content", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) assert len(resources) == 0 def test_discovers_multiple_extensions(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - (skill_dir / "config.yaml").write_text("key: val", encoding="utf-8") - (skill_dir / "notes.txt").write_text("notes", encoding="utf-8") + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "data.json").write_text("{}", encoding="utf-8") + (refs / "config.yaml").write_text("key: val", encoding="utf-8") + (refs / "notes.txt").write_text("notes", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) assert len(resources) == 3 names = set(resources) - assert "data.json" in names - assert "config.yaml" in names - assert "notes.txt" in names + assert "references/data.json" in names + assert "references/config.yaml" in names + assert "references/notes.txt" in names def test_ignores_unsupported_extensions(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - (skill_dir / "image.png").write_bytes(b"\x89PNG") - (skill_dir / "binary.exe").write_bytes(b"\x00") + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "image.png").write_bytes(b"\x89PNG") + (refs / "binary.exe").write_bytes(b"\x00") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) assert len(resources) == 0 def test_custom_extensions(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - (skill_dir / "notes.txt").write_text("notes", encoding="utf-8") + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "data.json").write_text("{}", encoding="utf-8") + (refs / "notes.txt").write_text("notes", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir), extensions=(".json",)) - assert resources == ["data.json"] + assert resources == ["references/data.json"] - def test_discovers_nested_files(self, tmp_path: Path) -> None: + def test_does_not_discover_nested_files(self, tmp_path: Path) -> None: + """Non-recursive: files inside subdirectories of configured dirs are not discovered.""" skill_dir = tmp_path / "my-skill" - sub = skill_dir / "refs" / "deep" + sub = skill_dir / "references" / "deep" sub.mkdir(parents=True) (sub / "doc.md").write_text("deep doc", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert "refs/deep/doc.md" in resources + assert len(resources) == 0 + + def test_root_directory_discovers_root_files(self, tmp_path: Path) -> None: + """The '.' root indicator discovers files at the skill root level.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "data.json").write_text("{}", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) + assert "data.json" in resources + + def test_root_does_not_discover_by_default(self, tmp_path: Path) -> None: + """Files at skill root are not discovered with default directories.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "data.json").write_text("{}", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + assert len(resources) == 0 + + def test_custom_directories(self, tmp_path: Path) -> None: + """Custom directory names override defaults.""" + skill_dir = tmp_path / "my-skill" + custom = skill_dir / "docs" + custom.mkdir(parents=True) + (custom / "readme.md").write_text("readme", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=("docs",)) + assert "docs/readme.md" in resources + + def test_nonexistent_directory_silently_skipped(self, tmp_path: Path) -> None: + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=("nonexistent",)) + assert resources == [] def test_empty_directory(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" @@ -260,6 +313,27 @@ def test_default_extensions_match_constant(self) -> None: assert ".xml" in DEFAULT_RESOURCE_EXTENSIONS assert ".txt" in DEFAULT_RESOURCE_EXTENSIONS + def test_duplicate_directories_deduplicated(self, tmp_path: Path) -> None: + """Duplicate directory entries should not produce duplicate resources.""" + skill_dir = tmp_path / "my-skill" + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "doc.md").write_text("content", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files( + str(skill_dir), directories=("references", "references") + ) + assert resources == ["references/doc.md"] + + def test_results_are_sorted(self, tmp_path: Path) -> None: + """Results should be sorted for stable ordering.""" + skill_dir = tmp_path / "my-skill" + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "zebra.md").write_text("z", encoding="utf-8") + (refs / "alpha.md").write_text("a", encoding="utf-8") + resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + assert resources == ["references/alpha.md", "references/zebra.md"] + class TestTryParseSkillDocument: """Tests for _extract_frontmatter.""" @@ -414,11 +488,11 @@ async def test_skill_with_resources(self, tmp_path: Path) -> None: tmp_path, "my-skill", body="Instructions here.", - resources={"refs/FAQ.md": "FAQ content"}, + resources={"references/FAQ.md": "FAQ content"}, ) skills = await _discover_file_skills_for_test([str(tmp_path)]) assert "my-skill" in skills - assert [r.name for r in skills["my-skill"].resources] == ["refs/FAQ.md"] + assert [r.name for r in skills["my-skill"].resources] == ["references/FAQ.md"] async def test_skill_discovers_all_resource_files(self, tmp_path: Path) -> None: """Resources are discovered by filesystem scan, not by markdown links.""" @@ -426,13 +500,13 @@ async def test_skill_discovers_all_resource_files(self, tmp_path: Path) -> None: tmp_path, "my-skill", body="No links here.", - resources={"data.json": '{"key": "val"}', "refs/doc.md": "doc content"}, + resources={"references/data.json": '{"key": "val"}', "assets/doc.md": "doc content"}, ) skills = await _discover_file_skills_for_test([str(tmp_path)]) assert "my-skill" in skills resource_names = sorted(r.name for r in skills["my-skill"].resources) - assert "data.json" in resource_names - assert "refs/doc.md" in resource_names + assert "assets/doc.md" in resource_names + assert "references/data.json" in resource_names # --------------------------------------------------------------------------- @@ -447,12 +521,12 @@ async def test_reads_valid_resource(self, tmp_path: Path) -> None: _write_skill( tmp_path, "my-skill", - body="See [doc](refs/FAQ.md).", - resources={"refs/FAQ.md": "FAQ content here"}, + body="See [doc](references/FAQ.md).", + resources={"references/FAQ.md": "FAQ content here"}, ) skill_dir = tmp_path / "my-skill" - full_path = str(skill_dir / "refs" / "FAQ.md") - resource = _FileSkillResource(name="refs/FAQ.md", full_path=full_path) + full_path = str(skill_dir / "references" / "FAQ.md") + resource = _FileSkillResource(name="references/FAQ.md", full_path=full_path) content = await resource.read() assert content == "FAQ content here" @@ -469,12 +543,12 @@ async def test_reads_resource_with_exact_casing(self, tmp_path: Path) -> None: _write_skill( tmp_path, "my-skill", - body="See [doc](refs/FAQ.md).", - resources={"refs/FAQ.md": "FAQ content"}, + body="See [doc](references/FAQ.md).", + resources={"references/FAQ.md": "FAQ content"}, ) skill_dir = tmp_path / "my-skill" - full_path = str(skill_dir / "refs" / "FAQ.md") - resource = _FileSkillResource(name="refs/FAQ.md", full_path=full_path) + full_path = str(skill_dir / "references" / "FAQ.md") + resource = _FileSkillResource(name="references/FAQ.md", full_path=full_path) content = await resource.read() assert content == "FAQ content" @@ -487,7 +561,7 @@ def test_path_traversal_blocked_at_discovery(self, tmp_path: Path) -> None: skill_dir = tmp_path / "skill" skill_dir.mkdir() (tmp_path / "secret.md").write_text("secret", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) assert not any("secret" in r for r in resources) @@ -632,13 +706,13 @@ async def test_load_skill_preserves_file_skill_content(self, tmp_path: Path) -> _write_skill( tmp_path, "my-skill", - body="See [doc](refs/FAQ.md).", - resources={"refs/FAQ.md": "FAQ content"}, + body="See [doc](references/FAQ.md).", + resources={"references/FAQ.md": "FAQ content"}, ) provider = SkillsProvider.from_paths(str(tmp_path)) await _init_provider(provider) result = provider._load_skill(_raw_skills(provider), "my-skill") - assert "See [doc](refs/FAQ.md)." in result + assert "See [doc](references/FAQ.md)." in result async def test_load_skill_unknown_returns_error(self, tmp_path: Path) -> None: provider = SkillsProvider.from_paths(str(tmp_path)) @@ -656,12 +730,12 @@ async def test_read_skill_resource_returns_content(self, tmp_path: Path) -> None _write_skill( tmp_path, "my-skill", - body="See [doc](refs/FAQ.md).", - resources={"refs/FAQ.md": "FAQ content"}, + body="See [doc](references/FAQ.md).", + resources={"references/FAQ.md": "FAQ content"}, ) provider = SkillsProvider.from_paths(str(tmp_path)) await _init_provider(provider) - result = await provider._read_skill_resource(_raw_skills(provider), "my-skill", "refs/FAQ.md") + result = await provider._read_skill_resource(_raw_skills(provider), "my-skill", "references/FAQ.md") assert result == "FAQ content" async def test_read_skill_resource_unknown_skill_returns_error(self, tmp_path: Path) -> None: @@ -790,7 +864,7 @@ async def test_discover_skips_symlinked_resource(self, tmp_path: Path) -> None: "---\nname: my-skill\ndescription: A test skill.\n---\nInstructions.\n", encoding="utf-8", ) - refs_dir = skill_dir / "refs" + refs_dir = skill_dir / "references" refs_dir.mkdir() (refs_dir / "leak.md").symlink_to(outside_file) # Also add a safe resource @@ -799,8 +873,8 @@ async def test_discover_skips_symlinked_resource(self, tmp_path: Path) -> None: skills = await _discover_file_skills_for_test([str(tmp_path)]) assert "my-skill" in skills resource_names = [r.name for r in skills["my-skill"].resources] - assert "refs/leak.md" not in resource_names - assert "refs/safe.md" in resource_names + assert "references/leak.md" not in resource_names + assert "references/safe.md" in resource_names def test_discover_resource_files_rejects_symlinked_resource(self, tmp_path: Path) -> None: """_discover_resource_files should exclude a symlinked resource file.""" @@ -810,12 +884,12 @@ def test_discover_resource_files_rejects_symlinked_resource(self, tmp_path: Path outside_file = tmp_path / "secret.md" outside_file.write_text("secret content", encoding="utf-8") - refs_dir = skill_dir / "refs" + refs_dir = skill_dir / "references" refs_dir.mkdir() (refs_dir / "leak.md").symlink_to(outside_file) resources = FileSkillsSource._discover_resource_files(str(skill_dir)) - assert "refs/leak.md" not in resources + assert "references/leak.md" not in resources def test_discover_skips_symlinked_script(self, tmp_path: Path) -> None: """_discover_script_files should skip scripts with symlinks in their path.""" @@ -1324,21 +1398,22 @@ async def test_combined_prompt_includes_both(self, tmp_path: Path) -> None: async def test_custom_resource_extensions(self, tmp_path: Path) -> None: """SkillsProvider accepts custom resource_extensions.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + refs = skill_dir / "references" + refs.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: A test skill.\n---\nBody.", encoding="utf-8", ) - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - (skill_dir / "notes.txt").write_text("notes", encoding="utf-8") + (refs / "data.json").write_text("{}", encoding="utf-8") + (refs / "notes.txt").write_text("notes", encoding="utf-8") # Only discover .json files provider = SkillsProvider.from_paths(str(tmp_path), resource_extensions=(".json",)) await _init_provider(provider) skill = _ctx(provider)[0]["my-skill"] resource_names = [r.name for r in skill.resources] - assert "data.json" in resource_names - assert "notes.txt" not in resource_names + assert "references/data.json" in resource_names + assert "references/notes.txt" not in resource_names # --------------------------------------------------------------------------- @@ -1370,11 +1445,11 @@ def test_path_set(self, tmp_path: Path) -> None: assert skill.path == str(tmp_path / "my-skill") async def test_resources_populated(self, tmp_path: Path) -> None: - _write_skill(tmp_path, "my-skill", resources={"refs/doc.md": "content"}) + _write_skill(tmp_path, "my-skill", resources={"references/doc.md": "content"}) skills = await _discover_file_skills_for_test([str(tmp_path)]) assert "my-skill" in skills resource_names = [r.name for r in skills["my-skill"].resources] - assert "refs/doc.md" in resource_names + assert "references/doc.md" in resource_names # --------------------------------------------------------------------------- @@ -1429,12 +1504,12 @@ class TestDiscoverResourceFilesEdgeCases: """Additional edge-case tests for filesystem resource discovery.""" def test_excludes_skill_md_case_insensitive(self, tmp_path: Path) -> None: - """SKILL.md in any casing is excluded.""" + """SKILL.md in any casing is excluded when scanning root.""" skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "skill.md").write_text("lowercase name", encoding="utf-8") (skill_dir / "other.md").write_text("keep me", encoding="utf-8") - resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + resources = FileSkillsSource._discover_resource_files(str(skill_dir), directories=(".",)) names = [r.lower() for r in resources] assert "skill.md" not in names assert "other.md" in resources @@ -1442,19 +1517,173 @@ def test_excludes_skill_md_case_insensitive(self, tmp_path: Path) -> None: def test_skips_directories(self, tmp_path: Path) -> None: """Directories are not included as resources even if their name matches an extension.""" skill_dir = tmp_path / "my-skill" - subdir = skill_dir / "data.json" - subdir.mkdir(parents=True) + refs = skill_dir / "references" + refs.mkdir(parents=True) + subdir = refs / "data.json" + subdir.mkdir() resources = FileSkillsSource._discover_resource_files(str(skill_dir)) assert resources == [] def test_extension_matching_is_case_insensitive(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() - (skill_dir / "NOTES.TXT").write_text("caps", encoding="utf-8") + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "NOTES.TXT").write_text("caps", encoding="utf-8") resources = FileSkillsSource._discover_resource_files(str(skill_dir)) assert len(resources) == 1 +class TestValidateAndNormalizeDirectoryNames: + """Tests for _validate_and_normalize_directory_names.""" + + def test_simple_directory_name(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["references"]) + assert result == ["references"] + + def test_root_indicator(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["."]) + assert result == ["."] + + def test_dot_slash_normalizes_to_root(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["./"]) + assert result == ["."] + + def test_backslash_dot_normalizes_to_root(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names([".\\"]) + assert result == ["."] + + def test_backslashes_normalized(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["sub\\scripts"]) + assert result == ["sub/scripts"] + + def test_trailing_slash_stripped(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["scripts/"]) + assert result == ["scripts"] + + def test_leading_dot_slash_stripped(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["./references"]) + assert result == ["references"] + + def test_rejects_parent_traversal(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["../secrets"]) + assert result == [] + + def test_rejects_embedded_parent_traversal(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["sub/../secrets"]) + assert result == [] + + def test_rejects_absolute_path(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["/etc/passwd"]) + assert result == [] + + def test_rejects_windows_absolute_path(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names(["C:\\Windows"]) + assert result == [] + + def test_empty_string_raises(self) -> None: + with pytest.raises(ValueError, match="empty or whitespace"): + FileSkillsSource._validate_and_normalize_directory_names([""]) + + def test_whitespace_only_raises(self) -> None: + with pytest.raises(ValueError, match="empty or whitespace"): + FileSkillsSource._validate_and_normalize_directory_names([" "]) + + def test_multiple_directories(self) -> None: + result = FileSkillsSource._validate_and_normalize_directory_names( + [".", "references", "assets", "scripts"] + ) + assert result == [".", "references", "assets", "scripts"] + + def test_default_resource_directories(self) -> None: + assert DEFAULT_RESOURCE_DIRECTORIES == ("references", "assets") + + def test_default_script_directories(self) -> None: + assert DEFAULT_SCRIPT_DIRECTORIES == ("scripts",) + + def test_root_directory_indicator_is_dot(self) -> None: + assert ROOT_DIRECTORY_INDICATOR == "." + + +class TestFileSkillsSourceDirectories: + """Tests for resource_directories and script_directories parameters.""" + + async def test_custom_resource_directories(self, tmp_path: Path) -> None: + """Custom resource_directories controls which dirs are scanned.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + # Put resource in a custom directory + docs = skill_dir / "docs" + docs.mkdir() + (docs / "guide.md").write_text("guide", encoding="utf-8") + # Also put one in default references/ — should not be found + refs = skill_dir / "references" + refs.mkdir() + (refs / "ref.md").write_text("ref", encoding="utf-8") + + source = FileSkillsSource(str(tmp_path), resource_directories=["docs"]) + skills = await source.get_skills() + resource_names = [r.name for r in skills[0].resources] + assert "docs/guide.md" in resource_names + assert "references/ref.md" not in resource_names + + async def test_custom_script_directories(self, tmp_path: Path) -> None: + """Custom script_directories controls which dirs are scanned.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + # Put script in a custom directory + tools = skill_dir / "tools" + tools.mkdir() + (tools / "run.py").write_text("print('run')", encoding="utf-8") + + source = FileSkillsSource(str(tmp_path), script_directories=["tools"]) + skills = await source.get_skills() + script_names = [s.name for s in skills[0].scripts] + assert "tools/run.py" in script_names + + async def test_root_indicator_discovers_root_files(self, tmp_path: Path) -> None: + """The '.' root indicator discovers files at the skill root.""" + skill_dir = tmp_path / "my-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + (skill_dir / "data.json").write_text("{}", encoding="utf-8") + + source = FileSkillsSource(str(tmp_path), resource_directories=[".", "references"]) + skills = await source.get_skills() + resource_names = [r.name for r in skills[0].resources] + assert "data.json" in resource_names + + async def test_from_paths_passes_directories(self, tmp_path: Path) -> None: + """from_paths passes resource_directories and script_directories through.""" + skill_dir = tmp_path / "my-skill" + docs = skill_dir / "docs" + docs.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + (docs / "guide.md").write_text("guide", encoding="utf-8") + + provider = SkillsProvider.from_paths( + str(tmp_path), + resource_directories=["docs"], + ) + await _init_provider(provider) + skill = _ctx(provider)[0]["my-skill"] + resource_names = [r.name for r in skill.resources] + assert "docs/guide.md" in resource_names + + # --------------------------------------------------------------------------- # Tests: _is_path_within_directory # --------------------------------------------------------------------------- @@ -2614,12 +2843,13 @@ async def __call__(self, skill, script, args=None): assert isinstance(_CustomRunner(), SkillScriptRunner) skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") provider = SkillsProvider.from_paths( str(tmp_path), @@ -2635,12 +2865,13 @@ def sync_runner(skill, script, args=None): assert isinstance(sync_runner, SkillScriptRunner) skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") provider = SkillsProvider.from_paths( str(tmp_path), @@ -2652,12 +2883,13 @@ def sync_runner(skill, script, args=None): async def test_file_script_with_sync_runner_executes(self, tmp_path: Path) -> None: """A sync script_runner is awaitable through the provider's run_skill_script.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") def sync_runner(skill, script, args=None): return f"sync: {script.name} args={args}" @@ -2668,17 +2900,18 @@ def sync_runner(skill, script, args=None): ) await _init_provider(provider) run_tool = next(t for t in _ctx(provider)[2] if hasattr(t, "name") and t.name == "run_skill_script") - result = await run_tool.func(skill_name="my-skill", script_name="run.py", args={"key": "val"}) - assert result == "sync: run.py args={'key': 'val'}" + result = await run_tool.func(skill_name="my-skill", script_name="scripts/run.py", args={"key": "val"}) + assert result == "sync: scripts/run.py args={'key': 'val'}" async def test_file_skills_with_callback_runner(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") provider = SkillsProvider.from_paths( str(tmp_path), @@ -2712,12 +2945,13 @@ async def test_combined_skills(self, tmp_path: Path) -> None: async def test_file_scripts_without_runner_no_error_at_init(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") provider = SkillsProvider.from_paths(str(tmp_path)) # Initialization succeeds; the error now surfaces at script.run() time @@ -2993,7 +3227,23 @@ async def test_custom_template_without_runner_placeholder_raises(self) -> None: class TestFileScriptDiscovery: """Tests for automatic .py script discovery in skill directories.""" - async def test_discovers_py_files(self, tmp_path: Path) -> None: + async def test_discovers_py_files_in_scripts_dir(self, tmp_path: Path) -> None: + skill_dir = tmp_path / "my-skill" + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: my-skill\ndescription: test\n---\nBody", + encoding="utf-8", + ) + (scripts_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") + + skills = await _discover_file_skills_for_test(str(tmp_path)) + assert "my-skill" in skills + assert len(skills["my-skill"].scripts) == 1 + assert skills["my-skill"].scripts[0].name == "scripts/analyze.py" + + async def test_root_py_files_not_discovered_by_default(self, tmp_path: Path) -> None: + """Scripts at the skill root are NOT discovered with default directories.""" skill_dir = tmp_path / "my-skill" skill_dir.mkdir() (skill_dir / "SKILL.md").write_text( @@ -3004,8 +3254,7 @@ async def test_discovers_py_files(self, tmp_path: Path) -> None: skills = await _discover_file_skills_for_test(str(tmp_path)) assert "my-skill" in skills - assert len(skills["my-skill"].scripts) == 1 - assert skills["my-skill"].scripts[0].name == "analyze.py" + assert len(skills["my-skill"].scripts) == 0 async def test_discovered_script_has_absolute_full_path(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" @@ -3024,7 +3273,8 @@ async def test_discovered_script_has_absolute_full_path(self, tmp_path: Path) -> expected = str(Path(str(skill_dir), "scripts", "generate.py")) assert script.full_path == expected - async def test_discovers_nested_scripts(self, tmp_path: Path) -> None: + async def test_scripts_not_discovered_recursively(self, tmp_path: Path) -> None: + """Scripts inside subdirectories of scripts/ are NOT discovered (non-recursive).""" skill_dir = tmp_path / "my-skill" scripts_dir = skill_dir / "scripts" scripts_dir.mkdir(parents=True) @@ -3032,11 +3282,16 @@ async def test_discovers_nested_scripts(self, tmp_path: Path) -> None: "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (scripts_dir / "generate.py").write_text("print('gen')", encoding="utf-8") + # File directly in scripts/ is discovered + (scripts_dir / "top.py").write_text("print('top')", encoding="utf-8") + # File in scripts/sub/ is NOT discovered + sub_dir = scripts_dir / "sub" + sub_dir.mkdir() + (sub_dir / "nested.py").write_text("print('nested')", encoding="utf-8") skills = await _discover_file_skills_for_test(str(tmp_path)) assert len(skills["my-skill"].scripts) == 1 - assert skills["my-skill"].scripts[0].name == "scripts/generate.py" + assert skills["my-skill"].scripts[0].name == "scripts/top.py" async def test_no_scripts_when_no_py_files(self, tmp_path: Path) -> None: skill_dir = tmp_path / "my-skill" @@ -3057,36 +3312,38 @@ class TestCustomScriptExtensions: async def test_custom_script_extensions_via_get_skills(self, tmp_path: Path) -> None: """get_skills() forwards script_extensions to _discover_script_files.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") - (skill_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") + (scripts_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") # Default: only .py discovered skills_default = await _discover_file_skills_for_test(str(tmp_path)) script_names_default = [s.name for s in skills_default["my-skill"].scripts] - assert "analyze.py" in script_names_default - assert "run.sh" not in script_names_default + assert "scripts/analyze.py" in script_names_default + assert "scripts/run.sh" not in script_names_default # Custom: only .sh discovered skills_custom = await _discover_file_skills_for_test(str(tmp_path), script_extensions=(".sh",)) script_names_custom = [s.name for s in skills_custom["my-skill"].scripts] - assert "run.sh" in script_names_custom - assert "analyze.py" not in script_names_custom + assert "scripts/run.sh" in script_names_custom + assert "scripts/analyze.py" not in script_names_custom async def test_custom_script_extensions_via_provider(self, tmp_path: Path) -> None: """SkillsProvider accepts custom script_extensions.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") - (skill_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") + (scripts_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") # Only discover .sh scripts provider = SkillsProvider.from_paths( @@ -3097,20 +3354,21 @@ async def test_custom_script_extensions_via_provider(self, tmp_path: Path) -> No await _init_provider(provider) skill = _ctx(provider)[0]["my-skill"] script_names = [s.name for s in skill.scripts] - assert "run.sh" in script_names - assert "analyze.py" not in script_names + assert "scripts/run.sh" in script_names + assert "scripts/analyze.py" not in script_names async def test_multiple_script_extensions(self, tmp_path: Path) -> None: """Multiple script extensions can be specified.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") - (skill_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") - (skill_dir / "notes.txt").write_text("notes", encoding="utf-8") + (scripts_dir / "analyze.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.sh").write_text("#!/bin/bash", encoding="utf-8") + (scripts_dir / "notes.txt").write_text("notes", encoding="utf-8") provider = SkillsProvider.from_paths( str(tmp_path), @@ -3120,9 +3378,9 @@ async def test_multiple_script_extensions(self, tmp_path: Path) -> None: await _init_provider(provider) skill = _ctx(provider)[0]["my-skill"] script_names = [s.name for s in skill.scripts] - assert "analyze.py" in script_names - assert "run.sh" in script_names - assert "notes.txt" not in script_names + assert "scripts/analyze.py" in script_names + assert "scripts/run.sh" in script_names + assert "scripts/notes.txt" not in script_names def test_default_script_extensions_unchanged(self) -> None: """DEFAULT_SCRIPT_EXTENSIONS contains only .py.""" @@ -4275,21 +4533,22 @@ async def test_file_skills_source_discovers_skills(self, tmp_path: Path) -> None async def test_file_skills_source_with_extensions(self, tmp_path: Path) -> None: """FileSkillsSource resource_extensions controls extension filtering.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + refs = skill_dir / "references" + refs.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: Test skill.\n---\nBody.", encoding="utf-8", ) - (skill_dir / "data.json").write_text("{}", encoding="utf-8") - (skill_dir / "data.csv").write_text("a,b", encoding="utf-8") + (refs / "data.json").write_text("{}", encoding="utf-8") + (refs / "data.csv").write_text("a,b", encoding="utf-8") # Only allow .json resources source = FileSkillsSource(str(tmp_path), resource_extensions=(".json",)) skills = await source.get_skills() assert len(skills) == 1 resource_names = [r.name for r in skills[0].resources] - assert "data.json" in resource_names - assert "data.csv" not in resource_names + assert "references/data.json" in resource_names + assert "references/data.csv" not in resource_names async def test_in_memory_skills_source_returns_all_skills(self) -> None: """InMemorySkillsSource returns all provided skills.""" @@ -4512,12 +4771,13 @@ async def test_dedup_across_sources(self) -> None: async def test_file_source_with_script_runner(self, tmp_path: Path) -> None: """FileSkillsSource with script_runner enables script execution.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") source = DeduplicatingSkillsSource(FileSkillsSource(str(tmp_path), script_runner=_noop_script_runner)) provider = SkillsProvider(source) @@ -4547,12 +4807,13 @@ async def test_empty_source(self) -> None: async def test_per_source_runner(self, tmp_path: Path) -> None: """Per-source script runner is used when set on FileSkillsSource.""" skill_dir = tmp_path / "my-skill" - skill_dir.mkdir() + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) (skill_dir / "SKILL.md").write_text( "---\nname: my-skill\ndescription: test\n---\nBody", encoding="utf-8", ) - (skill_dir / "run.py").write_text("print('hi')", encoding="utf-8") + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") call_log: list[str] = [] @@ -4566,7 +4827,7 @@ async def source_runner(skill: Any, script: Any, args: Any = None) -> str: # The source-level runner should be discovered and used run_tool = next(t for t in _ctx(provider)[2] if hasattr(t, "name") and t.name == "run_skill_script") - result = await run_tool.func(skill_name="my-skill", script_name="run.py") + result = await run_tool.func(skill_name="my-skill", script_name="scripts/run.py") assert result == "source" assert call_log == ["source"] From 3d6b5b3210ede28c32c571f0da4b452dcb0c6c22 Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Wed, 13 May 2026 12:33:00 +0100 Subject: [PATCH 2/3] Address PR review: tighten path validation and add containment guard - Narrow Windows absolute path check to proper drive-root pattern (re.match r'^[A-Za-z]:[/\\]') to avoid rejecting valid POSIX names - Add _is_path_within_directory guard before _has_symlink_in_path in both discovery methods to prevent ValueError on escaped paths Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/_skills.py | 52 +++++++++++++------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index b2c93d2248..fcef5f6665 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -2378,7 +2378,7 @@ def _validate_and_normalize_directory_names( if ( os.path.isabs(directory) or normalized.startswith("/") - or (len(normalized) >= 2 and normalized[1] == ":") + or re.match(r"^[A-Za-z]:[/\\]", directory) ): logger.warning( "Skipping directory '%s': absolute paths are not allowed.", @@ -2441,14 +2441,23 @@ def _discover_resource_files( if not target_dir.is_dir(): continue - # Directory-level symlink check for non-root directories - if not is_root and FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): - logger.warning( - "Skipping resource directory '%s': symlink detected in path under skill directory '%s'", - directory, - skill_dir_path, - ) - continue + # Directory-level containment and symlink checks for non-root directories + if not is_root: + if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): + logger.warning( + "Skipping resource directory '%s': resolves outside skill directory '%s'", + directory, + skill_dir_path, + ) + continue + + if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): + logger.warning( + "Skipping resource directory '%s': symlink detected in path under skill directory '%s'", + directory, + skill_dir_path, + ) + continue # Scan top-level files only (non-recursive) within this directory try: @@ -2535,14 +2544,23 @@ def _discover_script_files( if not target_dir.is_dir(): continue - # Directory-level symlink check for non-root directories - if not is_root and FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): - logger.warning( - "Skipping script directory '%s': symlink detected in path under skill directory '%s'", - directory, - skill_dir_path, - ) - continue + # Directory-level containment and symlink checks for non-root directories + if not is_root: + if not FileSkillsSource._is_path_within_directory(resolved_target, root_directory_path): + logger.warning( + "Skipping script directory '%s': resolves outside skill directory '%s'", + directory, + skill_dir_path, + ) + continue + + if FileSkillsSource._has_symlink_in_path(resolved_target, root_directory_path): + logger.warning( + "Skipping script directory '%s': symlink detected in path under skill directory '%s'", + directory, + skill_dir_path, + ) + continue # Scan top-level files only (non-recursive) within this directory try: From fc2ee0fdc4e974519f5a9864ffee85758236b920 Mon Sep 17 00:00:00 2001 From: SergeyMenshykh Date: Thu, 14 May 2026 09:52:00 +0100 Subject: [PATCH 3/3] Log warning on OSError during directory listing in skill discovery Address review comment: _discover_resource_files and _discover_script_files previously swallowed OSError silently when iterdir() failed. Now log a warning so permission errors and transient FS failures are visible instead of making resource/script directories silently disappear. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../packages/core/agent_framework/_skills.py | 10 ++++ .../packages/core/tests/core/test_skills.py | 48 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/python/packages/core/agent_framework/_skills.py b/python/packages/core/agent_framework/_skills.py index fcef5f6665..0834371995 100644 --- a/python/packages/core/agent_framework/_skills.py +++ b/python/packages/core/agent_framework/_skills.py @@ -2463,6 +2463,11 @@ def _discover_resource_files( try: entries = list(target_dir.iterdir()) except OSError: + logger.warning( + "Failed to list resource directory '%s' in skill directory '%s'; skipping.", + directory, + skill_dir_path, + ) continue for resource_file in entries: @@ -2566,6 +2571,11 @@ def _discover_script_files( try: entries = list(target_dir.iterdir()) except OSError: + logger.warning( + "Failed to list script directory '%s' in skill directory '%s'; skipping.", + directory, + skill_dir_path, + ) continue for script_file in entries: diff --git a/python/packages/core/tests/core/test_skills.py b/python/packages/core/tests/core/test_skills.py index 2dbc9b9363..ff23a2e2f9 100644 --- a/python/packages/core/tests/core/test_skills.py +++ b/python/packages/core/tests/core/test_skills.py @@ -1533,6 +1533,54 @@ def test_extension_matching_is_case_insensitive(self, tmp_path: Path) -> None: assert len(resources) == 1 +class TestDiscoverFilesOSErrorWarning: + """OSError during directory listing should log a warning, not fail silently.""" + + def test_resource_discovery_warns_on_oserror(self, tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: + """_discover_resource_files logs a warning when iterdir() raises OSError.""" + skill_dir = tmp_path / "my-skill" + refs = skill_dir / "references" + refs.mkdir(parents=True) + (refs / "guide.md").write_text("content", encoding="utf-8") + + original_iterdir = Path.iterdir + + def _patched_iterdir(self: Path) -> Any: + if self.name == "references": + raise PermissionError("access denied") + return original_iterdir(self) + + import unittest.mock + + with unittest.mock.patch.object(Path, "iterdir", _patched_iterdir): + resources = FileSkillsSource._discover_resource_files(str(skill_dir)) + + assert resources == [] + assert any("Failed to list resource directory" in r.message for r in caplog.records) + + def test_script_discovery_warns_on_oserror(self, tmp_path: Path, caplog: pytest.LogCaptureFixture) -> None: + """_discover_script_files logs a warning when iterdir() raises OSError.""" + skill_dir = tmp_path / "my-skill" + scripts_dir = skill_dir / "scripts" + scripts_dir.mkdir(parents=True) + (scripts_dir / "run.py").write_text("print('hi')", encoding="utf-8") + + original_iterdir = Path.iterdir + + def _patched_iterdir(self: Path) -> Any: + if self.name == "scripts": + raise PermissionError("access denied") + return original_iterdir(self) + + import unittest.mock + + with unittest.mock.patch.object(Path, "iterdir", _patched_iterdir): + scripts = FileSkillsSource._discover_script_files(str(skill_dir)) + + assert scripts == [] + assert any("Failed to list script directory" in r.message for r in caplog.records) + + class TestValidateAndNormalizeDirectoryNames: """Tests for _validate_and_normalize_directory_names."""