diff --git a/docs/error-codes.md b/docs/error-codes.md new file mode 100644 index 0000000..b973f0e --- /dev/null +++ b/docs/error-codes.md @@ -0,0 +1,325 @@ +# mcp-synology Error Codes + +This is the reference for every `code` value mcp-synology can return in a +structured error envelope. It is linked from the `help_url` field of those +envelopes. Each section's heading matches the `code` value exactly, so you +can jump directly from any error to the section that explains it. + +If you arrived here from an error envelope, scroll to the section matching the +error's `code` field. If you're browsing, the sections below are grouped by +category. + +> **Convention:** all section headings use the literal error code (e.g. +> `auth_failed`). The unit test in `tests/core/test_help_urls.py` enforces a +> 1:1 mapping between codes registered in `core/errors.HELP_URLS` and headings +> here, so renaming a section will fail CI. + +--- + +_Authentication and access_ + +## auth_failed + +**DSM rejected the credentials, requires 2FA, or the account is locked.** + +Common causes: + +- Wrong username or password in the credential store. +- 2FA is enforced on the account but has not been bootstrapped via `mcp-synology setup`. +- The account is disabled in DSM. +- The IP has been auto-blocked after too many failed attempts. + +Fix: + +1. Run `mcp-synology check -v`. The verbose output exercises the full auth chain + and reports which step fails (lookup → request → response). +2. If 2FA is required, run `mcp-synology setup` and complete the OTP exchange. + This stores the device token in the keyring so subsequent runs do not need + the OTP code. +3. If the IP is auto-blocked, sign in to DSM directly and clear the entry at + **Control Panel → Security → Account → Auto Block → Allow / Block List**. +4. Verify the account has access to FileStation: **Control Panel → User & Group + → [account] → Edit → Applications**. The account must be allowed to use + File Station. + +`auth_failed` is **not** retryable. mcp-synology will not auto-retry on this +code — fix the underlying credential or 2FA state before re-running. + +## permission_denied + +**The account authenticated but DSM rejected the operation (DSM error 105).** + +This is *not* a session issue. mcp-synology specifically does **not** re-authenticate +on code 105 — that would mask the real problem and risk lockout. + +Common causes: + +- The DSM user is not in the `administrators` group, and the operation requires + admin (e.g., `get_resource_usage`, `get_system_info`). +- The shared folder ACL excludes this user. +- The user has read-only access to a folder being written to. +- The user is restricted to certain shares and the request targets a different one. + +Fix: + +1. Identify which DSM account mcp-synology is using: check your config file or + run `mcp-synology check -v`. +2. For shared-folder operations: **Control Panel → Shared Folder → [folder] → + Edit → Permissions**. Confirm the account has the access level the tool needs + (read-only is not enough for upload, copy, move, delete, rename, create_folder). +3. For system-level tools: add the account to `administrators` via **Control + Panel → User & Group → [account] → Edit → User Groups**. If you cannot grant + admin, those tools are not usable for this account — that is a deliberate + DSM restriction, not an mcp-synology bug. +4. If you are using a dedicated service account (recommended): each share you + want to access needs explicit permission for that account. Service accounts + start with no access. + +## api_not_found + +**DSM does not expose the API the tool needs.** + +Common causes: + +- DSM version is too old (mcp-synology requires DSM 7.0 or later). +- The DSM package providing the API is not installed (e.g., File Station). +- The API exists but the package is stopped. + +Fix: + +1. Check the DSM version: **Control Panel → Info Center → General → DSM Version**. +2. Verify File Station is installed and running: **Package Center → Installed**. + If missing, install it from **Package Center**. +3. Run `mcp-synology check -v`. Debug logging dumps the cached API list returned + by `SYNO.API.Info`, which shows exactly which APIs the NAS reports. +4. If the API should exist but is not listed, restart the relevant package via + **Package Center → [package] → Action → Stop / Run**, or reboot the NAS. + +--- + +_Paths and files_ + +## not_found + +**The path could not be resolved on the NAS, or a local file in upload/download +does not exist.** + +Common causes: + +- Typo or wrong case in the share name (case sensitivity depends on the volume's filesystem). +- Share name omitted: paths must start with a real share, e.g. `/volume1` is wrong, `/photos` is right. +- The user lacks list permission on a parent directory, so DSM reports "not + found" rather than "denied" — DSM does this deliberately to avoid leaking + directory existence. +- For local-side errors: the local file (upload) or destination folder + (download) does not exist or is given as a relative path that resolves + somewhere unexpected. + +Fix: + +1. Call `list_shares` to see exact share names. They are case-sensitive on + Btrfs and case-insensitive on ext4 — when in doubt, match the displayed case. +2. Call `list_files` on the parent directory to confirm the target exists with + the expected name and case. +3. For uncertain locations, call `search_files` with a substring of the name. +4. If the share lists fine but a subdirectory says "not found", check + `permission_denied` — the user may lack list permission on the parent. +5. For local paths in upload/download: always pass absolute paths. Relative + paths resolve against the mcp-synology process working directory, which is + not the same as your shell's working directory when launched by Claude + Desktop or another MCP client. + +## already_exists + +**A file with the target name already exists at the destination.** + +This fires for upload, download, rename, copy, and move when the destination +is occupied and the caller did not opt in to overwriting. + +Fix: + +- For `upload_file`, `download_file`, copy, and move: pass `overwrite=true` to + replace the existing file. Only do this if you actually want the existing + data gone — overwrite is **not** transactional, so a failure mid-write can + leave the destination in a corrupted state. +- For `rename`: choose a different target name, or `delete_files` the existing + target first. +- If you need atomicity: write to a temporary name and rename on success + rather than relying on `overwrite=true`. + +## invalid_parameter + +**The input contains a value mcp-synology or DSM rejects before sending the +operation.** + +Common causes: + +- A filename contains a forbidden character: `/ \ : * ? " < > |`. +- The `new_name` argument to `rename` contains `/` (path separators are not + allowed in rename — use copy/move to relocate a file). +- A path is empty, all whitespace, or has no share component. +- A search keyword normalizes to empty after wildcard processing. + +Fix: + +- Strip or substitute the forbidden character set above. DSM enforces the + strict union of Windows and POSIX filename rules — even if your client OS + allows a character, DSM may not. +- For `rename`: pass only the new base name, not a path. To move a file across + directories, use the copy or move tool. +- For paths: ensure the first component is a real share name from `list_shares`. +- For search: provide a keyword with at least one non-wildcard character. + +## filesystem_error + +**A local OSError occurred reading or writing a file (uploads and downloads only).** + +This is *not* a DSM error — it's the local operating system telling +mcp-synology that the file operation failed. + +Common causes: + +- Local file lacks read permission (upload) or destination folder lacks write + permission (download). +- The destination folder does not exist. +- The local disk is read-only or the inode is locked. +- The path contains characters the local filesystem rejects. + +Fix: + +1. Check local permissions: `ls -la `. The relevant user is whoever + launched the mcp-synology process — for Claude Desktop, that is the user + running Claude Desktop, which is **not** necessarily root or a privileged user. +2. For downloads: ensure the destination folder exists and is writable. Use + absolute paths. +3. The error message includes the OSError details (ENOENT, EACCES, EROFS, + etc.) — read those for the precise cause before guessing. + +--- + +_Storage and resources_ + +## disk_full + +**No space available on the destination volume, or the user's quota is exhausted.** + +Maps to DSM error 416 (volume full) and 415 (quota exceeded). For +`download_file`, this also covers local ENOSPC. + +Fix: + +1. Check NAS storage: **Storage Manager → Storage** in DSM. Free space, expand + the volume, or move data off it. +2. Check user quota: **Control Panel → User & Group → [account] → Edit → + Quota**. Quotas can fill long before the underlying volume does, and the + error looks the same. +3. For local downloads: `df -h` on the destination filesystem. +4. After freeing space, the operation is safe to retry — `disk_full` is marked + `retryable=true` in the error envelope. + +## timeout + +**A long-running DSM background task did not complete within the timeout window.** + +Affects search, copy, move, delete, and `get_dir_size` — these all run as +asynchronous DSM tasks that mcp-synology polls. + +Common causes: + +- The operation is genuinely large (deleting a directory tree with millions of + files; searching a non-indexed share). +- The DSM search service is overloaded. Search on shares with + `has_not_index_share=true` is unreliable under load. +- The NAS is starved for CPU or I/O by another workload (Plex transcoding, + SMR drive write amplification, RAID resync, snapshot replication). +- Orphaned background tasks from prior aborted runs are competing for resources. + +Fix: + +1. Check NAS health first: call `get_resource_usage` to confirm CPU and RAM + are not pinned. If they are, fix that before retrying. +2. For search: narrow the path to a single share rather than the root, use a + more specific keyword, and avoid rapid-fire repeats. The search service can + exhaust itself and refuse new tasks. +3. For copy, move, and delete: split the operation into smaller batches. +4. To clear suspected orphaned tasks: stop and start the File Station package + from **Package Center**. mcp-synology cleans up its own tasks via + `try/finally`, but tasks from earlier crashed runs may persist. +5. `timeout` is marked `retryable=true`, but only retry after addressing the + root cause. Blind retry on an overloaded NAS makes the situation worse. + +## unavailable + +**mcp-synology called DSM successfully, but DSM returned an empty payload +where data was expected.** + +Affects `get_system_info`, `get_resource_usage`, and similar metric tools. +The API responded `success=true` but the data block was missing or empty. + +Common causes: + +- DSM is in a half-initialized state immediately after boot or upgrade. + System monitor services start asynchronously and may not be ready yet. +- The user lacks permission to read the requested fields. DSM strips fields + silently in some cases rather than returning a permission error. +- The DSM package providing the metric is stopped. + +Fix: + +1. Wait one to two minutes after a NAS reboot before calling system tools. +2. Verify the DSM user is in the `administrators` group for system-level tools. + See `permission_denied` for the rationale. +3. Open **Resource Monitor** in DSM directly. If the panel is empty or + incomplete there too, the issue is on the NAS side, not in mcp-synology. +4. Run `mcp-synology check -v` and reproduce the call — the debug log shows + the raw DSM response, which usually tells you which field is missing. + +--- + +_Catch-all_ + +## filestation_error + +**A File Station API error that mcp-synology recognized as File-Station-scoped +but did not map to a more specific code above.** + +This is the fall-through for `SYNO.FileStation.*` calls whose DSM error code +isn't in the specific-handler list (400–421 range is mostly covered; codes +like 402 "System too busy" and 900 "Unexpected server error" land here). The +envelope includes the DSM numeric code in the message text, so you can look +it up directly. + +Fix: + +1. Read the numeric DSM code from the message (e.g., "DSM error 402"). This + is the authoritative identifier — the `filestation_error` label is just + the bucket. +2. Cross-reference against the + [Synology File Station API documentation](https://global.download.synology.com/download/Document/Software/DeveloperGuide/Package/FileStation/All/enu/Synology_File_Station_API_Guide.pdf) + for that specific code. +3. **Code 402 ("System too busy"):** wait and retry. This is genuinely + transient and often resolves on its own within a minute. +4. **Code 900 ("Unexpected server error"):** check **Log Center** in DSM for + matching events. This usually indicates an internal DSM fault and may + require a package restart or reboot. +5. If you see a specific code repeatedly, [open an issue](https://github.com/cmeans/mcp-synology/issues/new) + with the code and operation so we can add it to `core/errors.py`. + +## dsm_error + +**A DSM error that does not map to one of the specific codes above.** + +Typically a background-task failure with a non-standard error structure, or a +DSM error code mcp-synology does not recognize yet. + +Fix: + +1. Run `mcp-synology check -v` and reproduce the failing operation. Debug logs + include the raw DSM response with the exact error code. +2. Check **Log Center** in DSM for matching events around the same timestamp. +3. If you see a recurring DSM error code that mcp-synology does not handle + specifically, [open an issue](https://github.com/cmeans/mcp-synology/issues/new) + with the code, the operation that triggered it, and the debug log snippet. + New codes are added to `core/errors.py` as we encounter them. +4. For background task failures (copy, move, delete): the task may have + partially completed. List the destination to confirm state before retrying. diff --git a/src/mcp_synology/core/errors.py b/src/mcp_synology/core/errors.py index 4e83635..95b1f7c 100644 --- a/src/mcp_synology/core/errors.py +++ b/src/mcp_synology/core/errors.py @@ -2,51 +2,151 @@ from __future__ import annotations +from enum import StrEnum + +# Help URLs point at sections of our own error-code reference rather than +# Synology's KB. The KB is sparsely indexed and rarely matches the semantics +# of an MCP error — our page can say "use overwrite=true" and "run +# mcp-synology check -v", which the vendor docs never will. +# +# Adding a new error code: add a member to ErrorCode below AND add a +# matching `## ` section to docs/error-codes.md. The unit test in +# tests/core/test_help_urls.py will fail if the two drift out of sync. + + +class ErrorCode(StrEnum): + """Canonical error codes emitted in structured error envelopes. + + Single source of truth for every ``code`` value that can appear in + an error response. ``SynologyError`` subclasses set their + ``error_code`` to a member of this enum, and call sites that emit + codes directly (via ``error_response(code=...)``) pass a member + rather than a bare string — this prevents typos and gives mypy a + place to catch drift. + + Members are strings (``StrEnum``), so dict lookups, JSON + serialization, and equality against string literals all work without + any conversion. + """ + + # Auth and access + AUTH_FAILED = "auth_failed" + SESSION_EXPIRED = "session_expired" + PERMISSION_DENIED = "permission_denied" + API_NOT_FOUND = "api_not_found" + + # Paths and files + NOT_FOUND = "not_found" + ALREADY_EXISTS = "already_exists" + INVALID_PARAMETER = "invalid_parameter" + FILESYSTEM_ERROR = "filesystem_error" + + # Storage and runtime + DISK_FULL = "disk_full" + TIMEOUT = "timeout" + UNAVAILABLE = "unavailable" + + # Catch-alls + FILESTATION_ERROR = "filestation_error" + DSM_ERROR = "dsm_error" + + +GITHUB_DOCS_BASE = "https://github.com/cmeans/mcp-synology/blob/main/docs/error-codes.md" + +# Codes that are intentionally NOT surfaced to users and therefore need +# no documentation section. ``session_expired`` is auto-retried at the +# core-client layer; if it ever reaches a user, the retry path itself +# has failed and the underlying failure's code is what they see. +_CODES_WITHOUT_HELP_URL: frozenset[ErrorCode] = frozenset({ErrorCode.SESSION_EXPIRED}) + +HELP_URLS: dict[str, str] = { + code.value: f"{GITHUB_DOCS_BASE}#{code.value}" + for code in ErrorCode + if code not in _CODES_WITHOUT_HELP_URL +} + class SynologyError(Exception): """Base exception for all Synology DSM errors.""" - def __init__(self, message: str, code: int | None = None, suggestion: str | None = None): + error_code: ErrorCode = ErrorCode.DSM_ERROR + retryable: bool = False + + def __init__( + self, + message: str, + code: int | None = None, + suggestion: str | None = None, + help_url: str | None = None, + ): self.code = code self.suggestion = suggestion + # Per-instance override. When None (the usual case), callers resolve + # the URL from HELP_URLS using error_code at response-building time. + self.help_url = help_url super().__init__(message) class AuthenticationError(SynologyError): """Authentication failed (bad credentials, 2FA required, etc.).""" + error_code = ErrorCode.AUTH_FAILED + class SessionExpiredError(SynologyError): - """Session expired or invalidated (codes 106, 107, 119).""" + """Session expired or invalidated (codes 106, 107, 119). + + Intentionally has no entry in HELP_URLS — session expiry is auto-retried + by the core client and should never be surfaced to an end user. If it + ever is, that indicates the retry path itself failed, which is a bug. + """ + + error_code = ErrorCode.SESSION_EXPIRED + retryable = True class SynologyPermissionError(SynologyError): """Permission denied (code 105). NOT a session issue — never re-auth on this.""" + error_code = ErrorCode.PERMISSION_DENIED + class ApiNotFoundError(SynologyError): """Requested API does not exist on this NAS.""" + error_code = ErrorCode.API_NOT_FOUND + class FileStationError(SynologyError): """Base exception for File Station API errors.""" + error_code = ErrorCode.FILESTATION_ERROR + class PathNotFoundError(FileStationError): """Path not found (code 408).""" + error_code = ErrorCode.NOT_FOUND + class SynologyFileExistsError(FileStationError): """File already exists at destination (code 414).""" + error_code = ErrorCode.ALREADY_EXISTS + class DiskFullError(FileStationError): """No space left on device (code 416).""" + error_code = ErrorCode.DISK_FULL + retryable = True + class IllegalNameError(FileStationError): """Invalid file or folder name (codes 418, 419).""" + error_code = ErrorCode.INVALID_PARAMETER + # Common DSM error codes (100-series) shared across all APIs. COMMON_ERROR_CODES: dict[int, tuple[str, str]] = { diff --git a/src/mcp_synology/core/formatting.py b/src/mcp_synology/core/formatting.py index 6a2721e..dcc19aa 100644 --- a/src/mcp_synology/core/formatting.py +++ b/src/mcp_synology/core/formatting.py @@ -3,7 +3,12 @@ from __future__ import annotations import datetime +import json from dataclasses import dataclass +from typing import TYPE_CHECKING, Any, NoReturn + +if TYPE_CHECKING: + from mcp_synology.core.errors import ErrorCode, SynologyError def format_table( @@ -152,6 +157,87 @@ def format_error( return "\n".join(lines) +def error_response( + code: ErrorCode, + message: str, + *, + retryable: bool, + param: str | None = None, + value: Any | None = None, + valid: list[str] | None = None, + suggestion: str | None = None, + help_url: str | None = None, +) -> NoReturn: + """Build a structured error envelope and raise ToolError. + + The MCP SDK wraps ToolError in a CallToolResult with isError=True, + so clients get proper error signaling. The JSON envelope provides + structured fields for smart clients alongside a human-readable message. + + ``code`` is typed as ``ErrorCode`` rather than ``str`` so a typo at a + call site becomes a mypy error rather than a silent envelope with a + missing ``help_url``. ``StrEnum`` members are strings at runtime, so + JSON serialization and dict lookups still work unchanged. + + When ``help_url`` is not provided, the code is looked up in + ``core.errors.HELP_URLS`` so every registered code gets a link + automatically without the caller having to know the URL. Pass + ``help_url`` explicitly to override the registered default. + + Raises: + ToolError: always — this function never returns. + """ + from mcp.server.fastmcp.exceptions import ToolError + + from mcp_synology.core.errors import HELP_URLS + + error: dict[str, Any] = { + "code": code.value, + "message": message, + "retryable": retryable, + } + if param is not None: + error["param"] = param + if value is not None: + error["value"] = value + if valid is not None: + error["valid"] = valid + if suggestion is not None: + error["suggestion"] = suggestion + + resolved_help_url = help_url if help_url is not None else HELP_URLS.get(code.value) + if resolved_help_url is not None: + error["help_url"] = resolved_help_url + + # ``default=str`` keeps a future caller from crashing the error path + # by passing a non-JSON-serializable ``value`` (bytes, a custom object, + # etc.). All current callers pass strings, so this is a safety net. + raise ToolError(json.dumps({"status": "error", "error": error}, default=str)) + + +def synology_error_response(operation: str, exc: SynologyError) -> NoReturn: + """Convert a caught SynologyError into a structured error response. + + Maps SynologyError attributes to structured error fields and raises + ToolError with a JSON envelope. The operation name provides context + (e.g., "List files", "Upload"). + + Raises: + ToolError: always — this function never returns. + """ + msg = f"{operation} failed: {exc}" + if exc.code is not None: + msg = f"{operation} failed (DSM error {exc.code}): {exc}" + + error_response( + exc.error_code, + msg, + retryable=exc.retryable, + suggestion=exc.suggestion, + help_url=exc.help_url, + ) + + _SIZE_UNITS = ["B", "KB", "MB", "GB", "TB", "PB"] diff --git a/src/mcp_synology/modules/filestation/listing.py b/src/mcp_synology/modules/filestation/listing.py index 0dd5f4b..712c6ea 100644 --- a/src/mcp_synology/modules/filestation/listing.py +++ b/src/mcp_synology/modules/filestation/listing.py @@ -6,10 +6,10 @@ from mcp_synology.core.errors import SynologyError from mcp_synology.core.formatting import ( - format_error, format_size, format_table, format_timestamp, + synology_error_response, ) from mcp_synology.modules.filestation.helpers import ( file_type_icon, @@ -61,7 +61,7 @@ async def list_shares( }, ) except SynologyError as e: - return format_error("List shares", str(e), e.suggestion) + synology_error_response("List shares", e) shares = data.get("shares", []) if not shares: @@ -134,7 +134,7 @@ async def list_files( try: data = await client.request("SYNO.FileStation.List", "list", params=params) except SynologyError as e: - return format_error("List files", str(e), e.suggestion) + synology_error_response("List files", e) files = data.get("files", []) total = data.get("total", len(files)) diff --git a/src/mcp_synology/modules/filestation/metadata.py b/src/mcp_synology/modules/filestation/metadata.py index f501600..35e44c2 100644 --- a/src/mcp_synology/modules/filestation/metadata.py +++ b/src/mcp_synology/modules/filestation/metadata.py @@ -6,13 +6,14 @@ import logging from typing import TYPE_CHECKING, Any -from mcp_synology.core.errors import SynologyError +from mcp_synology.core.errors import ErrorCode, SynologyError from mcp_synology.core.formatting import ( - format_error, + error_response, format_key_value, format_size, format_table, format_timestamp, + synology_error_response, ) from mcp_synology.modules.filestation.helpers import ( escape_multi_path, @@ -62,7 +63,7 @@ async def get_file_info( }, ) except SynologyError as e: - return format_error("Get file info", str(e), e.suggestion) + synology_error_response("Get file info", e) files = data.get("files", []) @@ -71,10 +72,11 @@ async def get_file_info( # Multiple files: table format if not files: - return format_error( - "Get file info", - "No file information returned.", - "Check that the paths exist.", + error_response( + ErrorCode.NOT_FOUND, + "Get file info failed: No file information returned.", + retryable=False, + suggestion="Check that the paths exist.", ) headers = ["Name", "Path", "Type", "Size", "Modified"] @@ -151,7 +153,7 @@ async def get_dir_size( params={"path": normalized}, ) except SynologyError as e: - return format_error("Get directory size", str(e), e.suggestion) + synology_error_response("Get directory size", e) taskid = start_data.get("taskid", "") @@ -159,7 +161,7 @@ async def get_dir_size( # preventing orphaned processes that consume CPU on the NAS. elapsed = 0.0 interval = 0.5 - poll_error: str | None = None + poll_error: SynologyError | None = None result: str | None = None try: @@ -171,7 +173,7 @@ async def get_dir_size( params={"taskid": taskid}, ) except SynologyError as e: - poll_error = format_error("Get directory size", str(e), e.suggestion) + poll_error = e break if status.get("finished", False): @@ -195,12 +197,13 @@ async def get_dir_size( await _stop_dirsize_task(client, taskid) if poll_error: - return poll_error + synology_error_response("Get directory size", poll_error) if result: return result - return format_error( - "Get directory size", - f"Timed out after {timeout}s.", - "The directory may be very large. Try a subdirectory.", + error_response( + ErrorCode.TIMEOUT, + f"Get directory size failed: timed out after {timeout}s.", + retryable=True, + suggestion="The directory may be very large. Try a subdirectory.", ) diff --git a/src/mcp_synology/modules/filestation/operations.py b/src/mcp_synology/modules/filestation/operations.py index 0a3c4fe..eaa01dd 100644 --- a/src/mcp_synology/modules/filestation/operations.py +++ b/src/mcp_synology/modules/filestation/operations.py @@ -6,8 +6,13 @@ import logging from typing import TYPE_CHECKING, Any -from mcp_synology.core.errors import SynologyError -from mcp_synology.core.formatting import format_error, format_size, format_status +from mcp_synology.core.errors import ErrorCode, SynologyError +from mcp_synology.core.formatting import ( + error_response, + format_size, + format_status, + synology_error_response, +) from mcp_synology.modules.filestation.helpers import ( escape_multi_path, normalize_path, @@ -64,7 +69,7 @@ async def create_folder( }, ) except SynologyError as e: - return format_error("Create folder", str(e), e.suggestion) + synology_error_response("Create folder", e) folders = data.get("folders", []) lines = [format_status(f"Created {len(folders)} folder(s):")] @@ -86,10 +91,13 @@ async def rename( # Validate new_name is just a name, not a path if "/" in new_name: - return format_error( - "Rename", - "new_name should be just a filename, not a path.", - "Use move_files to relocate files to a different directory.", + error_response( + ErrorCode.INVALID_PARAMETER, + "Rename failed: new_name should be just a filename, not a path.", + retryable=False, + param="new_name", + value=new_name, + suggestion="Use move_files to relocate files to a different directory.", ) try: @@ -102,7 +110,7 @@ async def rename( }, ) except SynologyError as e: - return format_error("Rename", str(e), e.suggestion) + synology_error_response("Rename", e) files = data.get("files", []) if files: @@ -186,7 +194,7 @@ async def _copy_move( }, ) except SynologyError as e: - return format_error(f"{operation} files", str(e), e.suggestion) + synology_error_response(f"{operation} files", e) taskid = start_data.get("taskid", "") @@ -195,7 +203,7 @@ async def _copy_move( elapsed = 0.0 interval = 0.5 status: dict[str, Any] = {} - poll_error: str | None = None + poll_error: SynologyError | None = None timed_out = False try: @@ -208,7 +216,7 @@ async def _copy_move( params={"taskid": taskid}, ) except SynologyError as e: - poll_error = format_error(f"{operation} files", str(e), e.suggestion) + poll_error = e break logger.debug("%s status: %s", operation, status) @@ -230,12 +238,13 @@ async def _copy_move( ) if poll_error: - return poll_error + synology_error_response(f"{operation} files", poll_error) if timed_out: - return format_error( - f"{operation} files", - f"Timed out after {timeout}s.", - "The operation may still be running on the NAS.", + error_response( + ErrorCode.TIMEOUT, + f"{operation} files failed: timed out after {timeout}s.", + retryable=True, + suggestion="The operation may still be running on the NAS.", ) # Check for errors in the completed task @@ -243,10 +252,11 @@ async def _copy_move( err = status["error"] err_code = err.get("code", 0) if isinstance(err, dict) else err err_path = status.get("path", "") - return format_error( - f"{operation} files", - f"DSM error code {err_code} on path: {err_path}", - "Check that source paths exist and you have permission to access them.", + error_response( + ErrorCode.DSM_ERROR, + f"{operation} files failed: DSM error code {err_code} on path: {err_path}", + retryable=False, + suggestion="Check that source paths exist and you have permission to access them.", ) # Build response @@ -291,7 +301,7 @@ async def delete_files( }, ) except SynologyError as e: - return format_error("Delete files", str(e), e.suggestion) + synology_error_response("Delete files", e) taskid = start_data.get("taskid", "") @@ -299,7 +309,7 @@ async def delete_files( elapsed = 0.0 interval = 0.5 status: dict[str, Any] = {} - poll_error: str | None = None + poll_error: SynologyError | None = None timed_out = False try: @@ -312,7 +322,7 @@ async def delete_files( params={"taskid": taskid}, ) except SynologyError as e: - poll_error = format_error("Delete files", str(e), e.suggestion) + poll_error = e break logger.debug("Delete status: %s", status) @@ -334,12 +344,13 @@ async def delete_files( ) if poll_error: - return poll_error + synology_error_response("Delete files", poll_error) if timed_out: - return format_error( - "Delete files", - f"Timed out after {timeout}s.", - "The operation may still be running on the NAS.", + error_response( + ErrorCode.TIMEOUT, + f"Delete files failed: timed out after {timeout}s.", + retryable=True, + suggestion="The operation may still be running on the NAS.", ) # Check for errors in the completed task @@ -347,10 +358,11 @@ async def delete_files( err = status["error"] err_code = err.get("code", 0) if isinstance(err, dict) else err err_path = status.get("path", "") - return format_error( - "Delete files", - f"DSM error code {err_code} on path: {err_path}", - "Check that paths exist and you have permission to delete them.", + error_response( + ErrorCode.DSM_ERROR, + f"Delete files failed: DSM error code {err_code} on path: {err_path}", + retryable=False, + suggestion="Check that paths exist and you have permission to delete them.", ) # Determine recycle bin status per share diff --git a/src/mcp_synology/modules/filestation/search.py b/src/mcp_synology/modules/filestation/search.py index 0bfcf5e..1bd1042 100644 --- a/src/mcp_synology/modules/filestation/search.py +++ b/src/mcp_synology/modules/filestation/search.py @@ -8,7 +8,12 @@ from typing import TYPE_CHECKING, Any from mcp_synology.core.errors import SynologyError -from mcp_synology.core.formatting import format_error, format_size, format_table, format_timestamp +from mcp_synology.core.formatting import ( + format_size, + format_table, + format_timestamp, + synology_error_response, +) from mcp_synology.modules.filestation.helpers import ( file_type_icon, matches_pattern, @@ -109,7 +114,7 @@ async def search_files( params=start_params, ) except SynologyError as e: - return format_error("Search files", str(e), e.suggestion) + synology_error_response("Search files", e) taskid = start_data.get("taskid", "") @@ -122,7 +127,7 @@ async def search_files( all_files: list[dict[str, Any]] = [] finished = False poll_count = 0 - poll_error: str | None = None + poll_error: SynologyError | None = None try: while (time.monotonic() - start_time) < timeout: @@ -139,7 +144,7 @@ async def search_files( }, ) except SynologyError as e: - poll_error = format_error("Search files", str(e), e.suggestion) + poll_error = e break poll_count += 1 @@ -158,7 +163,7 @@ async def search_files( await _cleanup_search_task(client, taskid, search_version, logger) if poll_error: - return poll_error + synology_error_response("Search files", poll_error) # Apply client-side exclude_pattern excluded_count = 0 diff --git a/src/mcp_synology/modules/filestation/transfer.py b/src/mcp_synology/modules/filestation/transfer.py index 204d065..02ffede 100644 --- a/src/mcp_synology/modules/filestation/transfer.py +++ b/src/mcp_synology/modules/filestation/transfer.py @@ -3,13 +3,19 @@ from __future__ import annotations import contextlib +import errno import logging import shutil from pathlib import Path, PurePosixPath from typing import TYPE_CHECKING -from mcp_synology.core.errors import SynologyError, SynologyFileExistsError -from mcp_synology.core.formatting import format_error, format_size, format_status +from mcp_synology.core.errors import ErrorCode, SynologyError, SynologyFileExistsError +from mcp_synology.core.formatting import ( + error_response, + format_size, + format_status, + synology_error_response, +) from mcp_synology.modules.filestation.helpers import normalize_path if TYPE_CHECKING: @@ -35,10 +41,13 @@ async def upload_file( """Upload a local file to a NAS folder.""" local = Path(local_path) if not local.is_file(): - return format_error( - "Upload", - f"Local file not found: {local_path}", - "Check the file path and try again.", + error_response( + ErrorCode.NOT_FOUND, + f"Upload failed: Local file not found: {local_path}", + retryable=False, + param="local_path", + value=local_path, + suggestion="Check the file path and try again.", ) file_size = local.stat().st_size @@ -59,18 +68,24 @@ async def upload_file( timeout=timeout, ) except SynologyFileExistsError: - return format_error( - "Upload", - f"File '{effective_name}' already exists in {dest}.", - "Use overwrite=true to replace the existing file.", + error_response( + ErrorCode.ALREADY_EXISTS, + f"Upload failed: File '{effective_name}' already exists in {dest}.", + retryable=False, + param="filename", + value=effective_name, + suggestion="Use overwrite=true to replace the existing file.", ) except SynologyError as e: - return format_error("Upload", str(e), e.suggestion) + synology_error_response("Upload", e) except OSError as e: - return format_error( - "Upload", - f"Failed to read local file '{local_path}': {e}", - "Check file permissions and that the file is not locked.", + error_response( + ErrorCode.FILESYSTEM_ERROR, + f"Upload failed: Failed to read local file '{local_path}': {e}", + retryable=False, + param="local_path", + value=local_path, + suggestion="Check file permissions and that the file is not locked.", ) # Report completion @@ -102,10 +117,13 @@ async def download_file( """Download a NAS file to a local directory.""" local_dir = Path(dest_folder) if not local_dir.is_dir(): - return format_error( - "Download", - f"Local directory not found: {dest_folder}", - "Check the directory path and try again.", + error_response( + ErrorCode.NOT_FOUND, + f"Download failed: Local directory not found: {dest_folder}", + retryable=False, + param="dest_folder", + value=dest_folder, + suggestion="Check the directory path and try again.", ) nas_path = normalize_path(path) @@ -113,10 +131,13 @@ async def download_file( dest_file = local_dir / effective_name if dest_file.exists() and not overwrite: - return format_error( - "Download", - f"Local file already exists: {dest_file}", - "Use overwrite=true to replace the existing file.", + error_response( + ErrorCode.ALREADY_EXISTS, + f"Download failed: Local file already exists: {dest_file}", + retryable=False, + param="dest_folder", + value=str(dest_file), + suggestion="Use overwrite=true to replace the existing file.", ) # Pre-flight disk space check using NAS file metadata. @@ -138,11 +159,13 @@ async def download_file( if nas_file_size: free_space = shutil.disk_usage(local_dir).free if nas_file_size > free_space: - return format_error( - "Download", - f"Insufficient local disk space: file is {format_size(nas_file_size)} " + error_response( + ErrorCode.DISK_FULL, + f"Download failed: Insufficient local disk space: " + f"file is {format_size(nas_file_size)} " f"but only {format_size(free_space)} free on {local_dir}.", - "Free space on the local disk or choose a different destination.", + retryable=True, + suggestion="Free space on the local disk or choose a different destination.", ) try: @@ -160,22 +183,36 @@ async def download_file( logger.debug("Cleaned up partial download: %s", dest_file) except OSError: logger.warning("Failed to clean up partial download: %s", dest_file) - return format_error("Download", str(e), e.suggestion) + synology_error_response("Download", e) except OSError as e: - # Filesystem rejected the write — illegal characters in filename on this OS, - # permission denied, disk full, path too long, etc. + # Filesystem rejected the write. Possibilities: + # - ENOSPC: disk full + # - EACCES/EPERM: permission denied + # - EINVAL/ENAMETOOLONG: illegal chars, path too long + # Disk-full is handled specifically here so smart clients get a + # retryable ``disk_full`` code that matches the pre-flight branch + # earlier in this function. Using ``errno`` rather than substring + # matching on the error message — the message is locale-dependent + # and varies across OS versions, ``errno`` does not. if dest_file.exists(): with contextlib.suppress(OSError): dest_file.unlink() - error_str = str(e) - if "disk space" in error_str.lower() or "space" in error_str.lower(): - suggestion = "Free space on the local disk or choose a different destination." - else: - suggestion = ( + if e.errno == errno.ENOSPC: + error_response( + ErrorCode.DISK_FULL, + f"Download failed: No space left on local disk: {e}", + retryable=True, + suggestion="Free space on the local disk or choose a different destination.", + ) + error_response( + ErrorCode.FILESYSTEM_ERROR, + f"Download failed: Failed to write local file: {e}", + retryable=False, + suggestion=( "The filename may contain characters not allowed on this OS. " "Use the filename parameter to specify a compatible name." - ) - return format_error("Download", f"Failed to write local file: {e}", suggestion) + ), + ) except Exception: # Clean up partial file on unexpected failure if dest_file.exists(): diff --git a/src/mcp_synology/modules/system/info.py b/src/mcp_synology/modules/system/info.py index 86bf722..6e4a78f 100644 --- a/src/mcp_synology/modules/system/info.py +++ b/src/mcp_synology/modules/system/info.py @@ -5,8 +5,8 @@ import logging from typing import TYPE_CHECKING, Any -from mcp_synology.core.errors import SynologyError -from mcp_synology.core.formatting import format_error, format_key_value +from mcp_synology.core.errors import ErrorCode, SynologyError +from mcp_synology.core.formatting import error_response, format_key_value if TYPE_CHECKING: from mcp_synology.core.client import DsmClient @@ -60,10 +60,11 @@ async def get_system_info(client: DsmClient) -> str: core = await _fetch_core_system_info(client) if not dsm and not core: - return format_error( - "System info", - "No system information available.", - "Check that the user has permission to query system info.", + error_response( + ErrorCode.UNAVAILABLE, + "System info failed: No system information available.", + retryable=True, + suggestion="Check that the user has permission to query system info.", ) pairs: list[tuple[str, str]] = [] @@ -121,6 +122,10 @@ async def get_system_info(client: DsmClient) -> str: pairs.append(("NTP", ntp_server)) if not pairs: - return format_error("System info", "No system information returned.", None) + error_response( + ErrorCode.UNAVAILABLE, + "System info failed: No system information returned.", + retryable=True, + ) return format_key_value(pairs, title="System Info") diff --git a/src/mcp_synology/modules/system/utilization.py b/src/mcp_synology/modules/system/utilization.py index 9fb78fe..5e28be4 100644 --- a/src/mcp_synology/modules/system/utilization.py +++ b/src/mcp_synology/modules/system/utilization.py @@ -5,8 +5,13 @@ import logging from typing import TYPE_CHECKING, Any -from mcp_synology.core.errors import SynologyError -from mcp_synology.core.formatting import format_error, format_key_value, format_size +from mcp_synology.core.errors import ErrorCode, SynologyError +from mcp_synology.core.formatting import ( + error_response, + format_key_value, + format_size, + synology_error_response, +) if TYPE_CHECKING: from mcp_synology.core.client import DsmClient @@ -103,23 +108,26 @@ def _format_disk(disks: list[dict[str, Any]]) -> list[tuple[str, str]]: async def get_resource_usage(client: DsmClient) -> str: """Get live system resource utilization.""" if "SYNO.Core.System.Utilization" not in client._api_cache: - return format_error( - "Resource usage", - "SYNO.Core.System.Utilization API not available.", - "This API may require admin privileges or may not be available on this DSM version.", + error_response( + ErrorCode.API_NOT_FOUND, + "Resource usage failed: SYNO.Core.System.Utilization API not available.", + retryable=False, + suggestion="This API may require admin privileges or may not be available " + "on this DSM version.", ) try: data = await client.request("SYNO.Core.System.Utilization", "get") except SynologyError as e: if e.code == 105: - return format_error( - "Resource usage", - "Permission denied — admin account required.", - "The SYNO.Core.System.Utilization API requires an admin DSM account. " + error_response( + ErrorCode.PERMISSION_DENIED, + "Resource usage failed: Permission denied — admin account required.", + retryable=False, + suggestion="The SYNO.Core.System.Utilization API requires an admin DSM account. " "Configure an admin connection or check DSM user permissions.", ) - return format_error("Resource usage", str(e), e.suggestion) + synology_error_response("Resource usage", e) pairs: list[tuple[str, str]] = [] @@ -150,10 +158,16 @@ async def get_resource_usage(client: DsmClient) -> str: pairs.extend(_format_disk(disk_list)) if not pairs: - return format_error( - "Resource usage", - "No utilization data returned.", - "The API returned successfully but no metrics were populated.", + # retryable=True matches the other ``unavailable`` call sites in + # modules/system/info.py and the PR's error-code table. The API + # responded successfully but populated nothing, which is more + # likely transient (service warming up, monitor not ready after + # reboot) than a permanent condition. + error_response( + ErrorCode.UNAVAILABLE, + "Resource usage failed: No utilization data returned.", + retryable=True, + suggestion="The API returned successfully but no metrics were populated.", ) return format_key_value(pairs, title="Resource Usage") diff --git a/tests/core/test_formatting.py b/tests/core/test_formatting.py index 3b29b06..b8a2453 100644 --- a/tests/core/test_formatting.py +++ b/tests/core/test_formatting.py @@ -1,7 +1,19 @@ """Tests for core/formatting.py — all shared formatters.""" +import json + +import pytest +from mcp.server.fastmcp.exceptions import ToolError + +from mcp_synology.core.errors import ( + ErrorCode, + FileStationError, + PathNotFoundError, + SynologyError, +) from mcp_synology.core.formatting import ( TreeNode, + error_response, format_error, format_key_value, format_size, @@ -9,6 +21,7 @@ format_table, format_timestamp, format_tree, + synology_error_response, ) @@ -144,3 +157,115 @@ def test_known_timestamp(self) -> None: # 2025-03-15 12:00:00 UTC result = format_timestamp(1742040000) assert "2025-03-15" in result + + +class TestErrorResponse: + def test_raises_tool_error(self) -> None: + with pytest.raises(ToolError) as exc_info: + error_response(ErrorCode.NOT_FOUND, "File not found", retryable=False) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" + assert body["error"]["message"] == "File not found" + assert body["error"]["retryable"] is False + + def test_includes_optional_fields(self) -> None: + with pytest.raises(ToolError) as exc_info: + error_response( + ErrorCode.INVALID_PARAMETER, + "Bad param", + retryable=False, + param="path", + value="/bad", + valid=["/good1", "/good2"], + suggestion="Try a valid path.", + help_url="https://example.com", + ) + body = json.loads(str(exc_info.value)) + err = body["error"] + assert err["param"] == "path" + assert err["value"] == "/bad" + assert err["valid"] == ["/good1", "/good2"] + assert err["suggestion"] == "Try a valid path." + assert err["help_url"] == "https://example.com" + + def test_omits_optional_fields_when_none(self) -> None: + # SESSION_EXPIRED is the one ErrorCode member intentionally absent + # from HELP_URLS (auto-retried, never surfaced to users), so it is + # the single legitimate way to exercise the "no help_url field" + # omission path under the tightened ``code: ErrorCode`` signature. + with pytest.raises(ToolError) as exc_info: + error_response(ErrorCode.SESSION_EXPIRED, "Something broke", retryable=True) + body = json.loads(str(exc_info.value)) + err = body["error"] + assert "param" not in err + assert "value" not in err + assert "valid" not in err + assert "suggestion" not in err + assert "help_url" not in err + assert err["retryable"] is True + + def test_help_url_auto_populated_from_registry(self) -> None: + # Every code registered in HELP_URLS should be auto-populated + # without the caller having to pass help_url explicitly. + with pytest.raises(ToolError) as exc_info: + error_response(ErrorCode.DSM_ERROR, "Something broke", retryable=True) + body = json.loads(str(exc_info.value)) + assert "help_url" in body["error"] + assert body["error"]["help_url"].endswith("#dsm_error") + + def test_help_url_explicit_override_wins(self) -> None: + # An explicit help_url argument must override the registered default + # so callers can point at something specific when needed. + with pytest.raises(ToolError) as exc_info: + error_response( + ErrorCode.DSM_ERROR, + "Something broke", + retryable=True, + help_url="https://example.com/custom", + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["help_url"] == "https://example.com/custom" + + +class TestSynologyErrorResponse: + def test_maps_path_not_found(self) -> None: + exc = PathNotFoundError("No such file", code=408, suggestion="Check path") + with pytest.raises(ToolError) as exc_info: + synology_error_response("List files", exc) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "not_found" + assert body["error"]["retryable"] is False + assert "List files" in body["error"]["message"] + assert "408" in body["error"]["message"] + assert body["error"]["suggestion"] == "Check path" + + def test_maps_generic_synology_error(self) -> None: + exc = SynologyError("Unknown", code=100) + with pytest.raises(ToolError) as exc_info: + synology_error_response("Operation", exc) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "dsm_error" + + def test_maps_filestation_error(self) -> None: + exc = FileStationError("FS error", code=401) + with pytest.raises(ToolError) as exc_info: + synology_error_response("Copy", exc) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "filestation_error" + + def test_includes_help_url(self) -> None: + exc = PathNotFoundError("Not found", code=408) + with pytest.raises(ToolError) as exc_info: + synology_error_response("Get info", exc) + body = json.loads(str(exc_info.value)) + assert body["error"]["help_url"] is not None + + def test_no_code(self) -> None: + exc = SynologyError("Network timeout") + with pytest.raises(ToolError) as exc_info: + synology_error_response("Upload", exc) + body = json.loads(str(exc_info.value)) + # When no code, message should not contain "DSM error" + assert "DSM error" not in body["error"]["message"] + assert "Upload failed" in body["error"]["message"] diff --git a/tests/core/test_help_urls.py b/tests/core/test_help_urls.py new file mode 100644 index 0000000..445ce02 --- /dev/null +++ b/tests/core/test_help_urls.py @@ -0,0 +1,164 @@ +"""Validate that every help_url points to a real section of error-codes.md. + +The central HELP_URLS registry in ``core.errors`` must stay in lockstep with +``docs/error-codes.md``. If a section is renamed or removed without updating +the registry (or vice versa), users following a help_url from an error +envelope will land on a dead link. + +These tests enforce both directions of that mapping: + +1. Every registered help URL must resolve to a real ``## anchor`` in the doc. +2. Every ``## anchor`` in the doc must correspond to a registered code — this + catches orphaned sections that nothing links to. +3. The anchor portion of each URL must literally equal its error code, so a + grep for the code immediately finds the right section. +4. Every SynologyError subclass's ``error_code`` must either be in HELP_URLS + or be explicitly exempt (e.g. ``session_expired``, which is auto-retried + and never surfaced to users). +""" + +from __future__ import annotations + +import re +from pathlib import Path + +import pytest + +from mcp_synology.core.errors import ( + GITHUB_DOCS_BASE, + HELP_URLS, + ErrorCode, + SynologyError, +) + +REPO_ROOT = Path(__file__).resolve().parents[2] +DOCS_PATH = REPO_ROOT / "docs" / "error-codes.md" + +# Error codes that intentionally have no help_url entry. Keep this list +# tight — every addition needs a justification, because "no help" is a +# user-facing regression unless there is a real reason the code cannot +# be surfaced. +EXEMPT_CODES: frozenset[ErrorCode] = frozenset( + { + # Auto-retried in the core client. A surfaced session_expired would + # mean the retry itself failed, which is reported under the + # underlying failure's code, not under session_expired. + ErrorCode.SESSION_EXPIRED, + } +) + + +def _extract_h2_anchors(text: str) -> set[str]: + """Return the set of ``## heading`` anchors from a markdown document. + + Only H2 headings are treated as error-code anchors. H1 is reserved for + the page title and H3 for sub-sections within a code (if we ever add them), + so a strict H2 rule keeps the anchor namespace unambiguous. + """ + return set(re.findall(r"^## (\S+)\s*$", text, re.MULTILINE)) + + +def _all_synology_error_subclasses() -> list[type[SynologyError]]: + """Recursively enumerate every SynologyError subclass.""" + seen: list[type[SynologyError]] = [] + + def walk(cls: type[SynologyError]) -> None: + for sub in cls.__subclasses__(): + seen.append(sub) + walk(sub) + + walk(SynologyError) + return seen + + +@pytest.fixture(scope="module") +def anchors() -> set[str]: + assert DOCS_PATH.is_file(), f"Troubleshooting doc missing at {DOCS_PATH}" + return _extract_h2_anchors(DOCS_PATH.read_text()) + + +class TestHelpUrlsResolveToRealAnchors: + def test_docs_file_exists(self) -> None: + assert DOCS_PATH.is_file(), f"{DOCS_PATH} not found" + + def test_every_registered_code_has_matching_anchor(self, anchors: set[str]) -> None: + missing = sorted(code for code in HELP_URLS if code not in anchors) + assert not missing, ( + f"Codes in HELP_URLS have no matching `## ` heading in " + f"{DOCS_PATH.name}: {missing}. Either add the section or remove " + f"the HELP_URLS entry." + ) + + def test_every_url_points_at_troubleshooting_doc(self) -> None: + wrong = sorted( + (code, url) + for code, url in HELP_URLS.items() + if not url.startswith(GITHUB_DOCS_BASE + "#") + ) + assert not wrong, ( + f"HELP_URLS entries must point at {GITHUB_DOCS_BASE}#. Wrong: {wrong}" + ) + + def test_url_anchor_equals_error_code(self) -> None: + mismatched: list[tuple[str, str]] = [] + for code, url in HELP_URLS.items(): + anchor = url.rsplit("#", 1)[-1] + if anchor != code: + mismatched.append((code, anchor)) + assert not mismatched, ( + f"Anchor must literally equal the error code for grep-ability: {mismatched}" + ) + + def test_no_orphan_sections(self, anchors: set[str]) -> None: + orphans = sorted(anchors - set(HELP_URLS.keys())) + assert not orphans, ( + f"{DOCS_PATH.name} has `## ` headings with no matching " + f"HELP_URLS entry: {orphans}. Either register the code or remove " + f"the section." + ) + + +class TestSynologyErrorSubclassCoverage: + def test_every_subclass_error_code_is_registered_or_exempt(self) -> None: + uncovered: list[tuple[str, ErrorCode]] = [] + for cls in _all_synology_error_subclasses(): + code = cls.error_code + if code in HELP_URLS or code in EXEMPT_CODES: + continue + uncovered.append((cls.__name__, code)) + assert not uncovered, ( + f"SynologyError subclasses have an error_code with no HELP_URLS " + f"entry and no exemption: {uncovered}. Add the code to HELP_URLS " + f"(and a section to error-codes.md) or justify it in " + f"EXEMPT_CODES in this test file." + ) + + def test_base_error_code_is_registered(self) -> None: + # SynologyError itself uses dsm_error — the catch-all code. That + # must always be registered, since any unhandled path ends up there. + assert SynologyError.error_code == ErrorCode.DSM_ERROR + assert ErrorCode.DSM_ERROR in HELP_URLS + + +class TestErrorCodeEnumCoverage: + """Every ErrorCode member is either documented or explicitly exempt.""" + + def test_every_enum_member_is_registered_or_exempt(self) -> None: + uncovered = [ + code for code in ErrorCode if code not in HELP_URLS and code not in EXEMPT_CODES + ] + assert not uncovered, ( + f"ErrorCode members have no HELP_URLS entry and no exemption: " + f"{[c.value for c in uncovered]}. Add a section to error-codes.md " + f"or justify the omission in EXEMPT_CODES." + ) + + def test_help_urls_keys_are_all_valid_error_codes(self) -> None: + # Catch accidental string drift: every HELP_URLS key must be an + # actual ErrorCode value (not a typo or renamed code). + valid_values = {c.value for c in ErrorCode} + invalid = sorted(set(HELP_URLS.keys()) - valid_values) + assert not invalid, ( + f"HELP_URLS has keys that are not ErrorCode values: {invalid}. " + f"Add them to ErrorCode or remove from HELP_URLS." + ) diff --git a/tests/modules/filestation/test_listing.py b/tests/modules/filestation/test_listing.py index f0e1398..07ba347 100644 --- a/tests/modules/filestation/test_listing.py +++ b/tests/modules/filestation/test_listing.py @@ -2,9 +2,12 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING +import pytest import respx +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.modules.filestation.listing import ( list_files, @@ -86,6 +89,18 @@ async def test_list_shares_empty(self, mock_client: DsmClient) -> None: result = await list_shares(mock_client) assert "No items" in result + @respx.mock + async def test_list_shares_error(self, mock_client: DsmClient) -> None: + """DSM error on list_shares should propagate as a structured envelope.""" + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": False, "error": {"code": 105}} + ) + with pytest.raises(ToolError) as exc_info: + await list_shares(mock_client) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "permission_denied" + class TestListFiles: @respx.mock @@ -159,8 +174,11 @@ async def test_list_files_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 408}} ) - result = await list_files(mock_client, path="/nonexistent") - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await list_files(mock_client, path="/nonexistent") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" class TestListRecycleBin: diff --git a/tests/modules/filestation/test_metadata.py b/tests/modules/filestation/test_metadata.py index 58c2516..7d62263 100644 --- a/tests/modules/filestation/test_metadata.py +++ b/tests/modules/filestation/test_metadata.py @@ -2,10 +2,13 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING import httpx +import pytest import respx +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.modules.filestation.metadata import get_dir_size, get_file_info from tests.conftest import BASE_URL @@ -79,8 +82,31 @@ async def test_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 408}} ) - result = await get_file_info(mock_client, paths=["/nonexistent"]) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await get_file_info(mock_client, paths=["/nonexistent"]) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" + + @respx.mock + async def test_empty_files_list_returns_not_found(self, mock_client: DsmClient) -> None: + """getinfo succeeds but returns no files → not_found. + + DSM returns ``success=true, files=[]`` for multi-path requests where + every path is missing or unreadable. The tool converts this to + not_found so clients don't have to special-case empty success. + """ + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": True, "data": {"files": []}} + ) + with pytest.raises(ToolError) as exc_info: + await get_file_info( + mock_client, + paths=["/video/missing1", "/video/missing2"], + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "not_found" + assert body["error"]["retryable"] is False class TestGetDirSize: @@ -116,3 +142,65 @@ def side_effect(request: httpx.Request) -> httpx.Response: assert "42.6 GB" in result assert "186" in result assert "12" in result + + @respx.mock + async def test_dir_size_start_error(self, mock_client: DsmClient) -> None: + """DSM error on the start call should propagate as a structured error.""" + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": False, "error": {"code": 408}} + ) + with pytest.raises(ToolError) as exc_info: + await get_dir_size(mock_client, path="/nonexistent") + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "not_found" + + @respx.mock + async def test_dir_size_poll_error(self, mock_client: DsmClient) -> None: + """DSM error mid-poll should still clean up the task and raise. + + The task is started successfully but the first status call fails. + The ``try/finally`` must still invoke stop/clean, and the error + must be surfaced as a structured envelope (filestation_error for + code 402, "System too busy"). + """ + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "ds-2"}}) + if method == "status": + return httpx.Response(200, json={"success": False, "error": {"code": 402}}) + # stop/clean + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await get_dir_size(mock_client, path="/video/busy") + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "filestation_error" + + @respx.mock + async def test_dir_size_timeout(self, mock_client: DsmClient) -> None: + """Polling never returns finished → timeout error, retryable=True.""" + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "ds-3"}}) + if method == "status": + # Never mark finished — force the polling loop to exhaust + # its budget. + return httpx.Response(200, json={"success": True, "data": {"finished": False}}) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + # Tight timeout so the test runs fast. + await get_dir_size(mock_client, path="/video/huge", timeout=1.0) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "timeout" + assert body["error"]["retryable"] is True diff --git a/tests/modules/filestation/test_operations.py b/tests/modules/filestation/test_operations.py index 9645b05..9bdf517 100644 --- a/tests/modules/filestation/test_operations.py +++ b/tests/modules/filestation/test_operations.py @@ -2,10 +2,13 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING import httpx +import pytest import respx +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.modules.filestation.operations import ( copy_files, @@ -65,8 +68,11 @@ async def test_create_folder_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 418}} ) - result = await create_folder(mock_client, paths=["/video/bad None: assert "Severance" in result async def test_rename_rejects_path_in_name(self, mock_client: DsmClient) -> None: - result = await rename(mock_client, path="/video/test", new_name="some/path/name") - assert "[!]" in result - assert "just a filename" in result + with pytest.raises(ToolError) as exc_info: + await rename(mock_client, path="/video/test", new_name="some/path/name") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "invalid_parameter" + assert body["error"]["param"] == "new_name" @respx.mock async def test_rename_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 419}} ) - result = await rename(mock_client, path="/video/test", new_name="bad None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 414}} ) - result = await copy_files( - mock_client, - paths=["/video/file.mkv"], - dest_folder="/video/dest", - ) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await copy_files( + mock_client, + paths=["/video/file.mkv"], + dest_folder="/video/dest", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "already_exists" class TestMoveFiles: @@ -147,13 +162,15 @@ async def test_move_conflict(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 414}} ) - result = await move_files( - mock_client, - paths=["/video/file.mkv"], - dest_folder="/video/dest", - ) - assert "[!]" in result - assert "exists" in result.lower() + with pytest.raises(ToolError) as exc_info: + await move_files( + mock_client, + paths=["/video/file.mkv"], + dest_folder="/video/dest", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "already_exists" class TestDeleteFiles: @@ -185,8 +202,11 @@ async def test_delete_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 408}} ) - result = await delete_files(mock_client, paths=["/nonexistent/file"]) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await delete_files(mock_client, paths=["/nonexistent/file"]) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" @respx.mock async def test_delete_multiple_shares(self, mock_client: DsmClient) -> None: @@ -200,6 +220,187 @@ async def test_delete_multiple_shares(self, mock_client: DsmClient) -> None: assert "NOT enabled" in result +class TestBackgroundTaskErrors: + """Error paths shared across CopyMove and Delete background tasks. + + These exercise the polling-loop branches that previously had no + coverage: mid-poll errors, timeouts, and task-completion error dicts. + """ + + @respx.mock + async def test_copy_timeout(self, mock_client: DsmClient) -> None: + """Copy task that never finishes within timeout → timeout error.""" + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "cm-1"}}) + if method == "status": + return httpx.Response(200, json={"success": True, "data": {"finished": False}}) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await copy_files( + mock_client, + paths=["/video/huge.mkv"], + dest_folder="/video/Archive", + timeout=1.0, + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "timeout" + assert body["error"]["retryable"] is True + assert "Copy files" in body["error"]["message"] + + @respx.mock + async def test_copy_task_completes_with_error(self, mock_client: DsmClient) -> None: + """Copy task finishes but status has an ``error`` key → dsm_error.""" + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "cm-2"}}) + if method == "status": + return httpx.Response( + 200, + json={ + "success": True, + "data": { + "finished": True, + "error": {"code": 1100}, + "path": "/video/restricted/file.mkv", + }, + }, + ) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await copy_files( + mock_client, + paths=["/video/restricted/file.mkv"], + dest_folder="/video/Archive", + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "dsm_error" + assert "1100" in body["error"]["message"] + assert "/video/restricted/file.mkv" in body["error"]["message"] + + @respx.mock + async def test_copy_poll_error_mid_operation(self, mock_client: DsmClient) -> None: + """DSM fails mid-poll → error propagates via synology_error_response.""" + state = {"calls": 0} + + def side_effect(request: httpx.Request) -> httpx.Response: + state["calls"] += 1 + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "cm-3"}}) + if method == "status": + # Fail on the first status call + return httpx.Response(200, json={"success": False, "error": {"code": 402}}) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await copy_files( + mock_client, + paths=["/video/file.mkv"], + dest_folder="/video/dest", + ) + body = json.loads(str(exc_info.value)) + # Code 402 "System too busy" is not specifically typed → filestation_error + assert body["error"]["code"] == "filestation_error" + + @respx.mock + async def test_delete_timeout(self, mock_client: DsmClient) -> None: + """Delete task that never finishes → timeout error.""" + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "del-1"}}) + if method == "status": + return httpx.Response(200, json={"success": True, "data": {"finished": False}}) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await delete_files( + mock_client, + paths=["/video/huge_dir"], + timeout=1.0, + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "timeout" + assert body["error"]["retryable"] is True + assert "Delete files" in body["error"]["message"] + + @respx.mock + async def test_delete_poll_error_mid_operation(self, mock_client: DsmClient) -> None: + """DSM fails on status call during delete polling. + + Covers the ``poll_error = e; break`` branch in delete_files, + mirroring the copymove version above. Previously uncovered in + the patch. + """ + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "del-err"}}) + if method == "status": + return httpx.Response(200, json={"success": False, "error": {"code": 402}}) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await delete_files(mock_client, paths=["/video/file.mkv"]) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "filestation_error" + + @respx.mock + async def test_delete_task_completes_with_error(self, mock_client: DsmClient) -> None: + """Delete task finishes with an error dict → dsm_error.""" + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "del-2"}}) + if method == "status": + return httpx.Response( + 200, + json={ + "success": True, + "data": { + "finished": True, + "error": {"code": 1100}, + "path": "/video/locked/file.mkv", + }, + }, + ) + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await delete_files(mock_client, paths=["/video/locked/file.mkv"]) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "dsm_error" + assert "1100" in body["error"]["message"] + + class TestRestoreFromRecycleBin: @respx.mock async def test_restore_success(self, mock_client: DsmClient) -> None: @@ -237,9 +438,12 @@ async def test_restore_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 408}} ) - result = await restore_from_recycle_bin( - mock_client, - share="video", - paths=["nonexistent.mkv"], - ) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await restore_from_recycle_bin( + mock_client, + share="video", + paths=["nonexistent.mkv"], + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" diff --git a/tests/modules/filestation/test_search.py b/tests/modules/filestation/test_search.py index c6b9a6e..ab341ee 100644 --- a/tests/modules/filestation/test_search.py +++ b/tests/modules/filestation/test_search.py @@ -2,10 +2,13 @@ from __future__ import annotations +import json from typing import TYPE_CHECKING import httpx +import pytest import respx +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.modules.filestation.search import search_files from tests.conftest import BASE_URL @@ -135,8 +138,41 @@ async def test_search_error(self, mock_client: DsmClient) -> None: respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( json={"success": False, "error": {"code": 408}} ) - result = await search_files(mock_client, folder_path="/nonexistent") - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await search_files(mock_client, folder_path="/nonexistent") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" + + @respx.mock + async def test_search_poll_error_mid_operation(self, mock_client: DsmClient) -> None: + """DSM fails on the status/list call after search task was started. + + Exercises the ``poll_error = e`` branch and the subsequent + ``synology_error_response("Search files", poll_error)`` after + the try/finally cleanup runs. Prior to this test, those lines + were uncovered in the patch. + """ + + def side_effect(request: httpx.Request) -> httpx.Response: + params = dict(request.url.params) + method = params.get("method", "") + if method == "start": + return httpx.Response(200, json={"success": True, "data": {"taskid": "search-err"}}) + if method == "list": + # First list call fails — DSM busy + return httpx.Response(200, json={"success": False, "error": {"code": 402}}) + # stop/clean + return httpx.Response(200, json={"success": True, "data": {}}) + + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock(side_effect=side_effect) + + with pytest.raises(ToolError) as exc_info: + await search_files(mock_client, folder_path="/video", pattern="test") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + # Code 402 "System too busy" is not specifically typed → filestation_error + assert body["error"]["code"] == "filestation_error" @respx.mock async def test_search_with_size_filter(self, mock_client: DsmClient) -> None: diff --git a/tests/modules/filestation/test_transfer.py b/tests/modules/filestation/test_transfer.py index a35be25..dc6aed3 100644 --- a/tests/modules/filestation/test_transfer.py +++ b/tests/modules/filestation/test_transfer.py @@ -3,11 +3,15 @@ from __future__ import annotations import contextlib +import errno +import json from typing import TYPE_CHECKING from unittest.mock import patch import httpx +import pytest import respx +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.modules.filestation.transfer import ( download_file, @@ -39,13 +43,15 @@ async def test_upload_success(self, mock_client: DsmClient, tmp_path: Path) -> N assert "/video/uploads/" in result async def test_upload_local_file_not_found(self, mock_client: DsmClient) -> None: - result = await upload_file( - mock_client, - local_path="/nonexistent/file.txt", - dest_folder="/video/uploads", - ) - assert "[!]" in result - assert "not found" in result.lower() + with pytest.raises(ToolError) as exc_info: + await upload_file( + mock_client, + local_path="/nonexistent/file.txt", + dest_folder="/video/uploads", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" @respx.mock async def test_upload_file_exists_no_overwrite( @@ -58,14 +64,16 @@ async def test_upload_file_exists_no_overwrite( json={"success": False, "error": {"code": 414}} ) - result = await upload_file( - mock_client, - local_path=str(local_file), - dest_folder="/video/uploads", - ) - assert "[!]" in result - assert "already exists" in result - assert "overwrite=true" in result + with pytest.raises(ToolError) as exc_info: + await upload_file( + mock_client, + local_path=str(local_file), + dest_folder="/video/uploads", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "already_exists" + assert "overwrite=true" in body["error"]["suggestion"] @respx.mock async def test_upload_overwrite_success(self, mock_client: DsmClient, tmp_path: Path) -> None: @@ -108,31 +116,38 @@ async def test_upload_dsm_error(self, mock_client: DsmClient, tmp_path: Path) -> json={"success": False, "error": {"code": 1802}} ) - result = await upload_file( - mock_client, - local_path=str(local_file), - dest_folder="/video/uploads", - ) - assert "[!]" in result - assert "Upload" in result + with pytest.raises(ToolError) as exc_info: + await upload_file( + mock_client, + local_path=str(local_file), + dest_folder="/video/uploads", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert "Upload" in body["error"]["message"] async def test_upload_local_file_permission_error( self, mock_client: DsmClient, tmp_path: Path ) -> None: - """OSError reading local file should return a formatted error.""" + """OSError reading local file should raise ToolError.""" local_file = tmp_path / "no_read.txt" local_file.write_text("hello") local_file.chmod(0o000) - result = await upload_file( - mock_client, - local_path=str(local_file), - dest_folder="/video/uploads", - ) - # Restore permissions for cleanup - local_file.chmod(0o644) - assert "[!]" in result - assert "permission" in result.lower() or "not found" in result.lower() + try: + with pytest.raises(ToolError) as exc_info: + await upload_file( + mock_client, + local_path=str(local_file), + dest_folder="/video/uploads", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + # Could be "not_found" (can't stat) or "filesystem_error" (can't read) + assert body["error"]["code"] in ("not_found", "filesystem_error") + finally: + # Restore permissions for cleanup + local_file.chmod(0o644) class TestDownloadFile: @@ -157,13 +172,15 @@ async def test_download_success(self, mock_client: DsmClient, tmp_path: Path) -> assert downloaded.read_bytes() == file_content async def test_download_local_dir_not_found(self, mock_client: DsmClient) -> None: - result = await download_file( - mock_client, - path="/video/movie.mkv", - dest_folder="/nonexistent/dir", - ) - assert "[!]" in result - assert "not found" in result.lower() + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/movie.mkv", + dest_folder="/nonexistent/dir", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "not_found" async def test_download_file_exists_no_overwrite( self, mock_client: DsmClient, tmp_path: Path @@ -171,14 +188,16 @@ async def test_download_file_exists_no_overwrite( existing = tmp_path / "movie.mkv" existing.write_text("existing") - result = await download_file( - mock_client, - path="/video/movie.mkv", - dest_folder=str(tmp_path), - ) - assert "[!]" in result - assert "already exists" in result - assert "overwrite=true" in result + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/movie.mkv", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "already_exists" + assert "overwrite=true" in body["error"]["suggestion"] @respx.mock async def test_download_overwrite_success(self, mock_client: DsmClient, tmp_path: Path) -> None: @@ -226,13 +245,15 @@ async def test_download_dsm_error_response( headers={"content-type": "application/json"}, ) - result = await download_file( - mock_client, - path="/video/nonexistent.mkv", - dest_folder=str(tmp_path), - ) - assert "[!]" in result - assert "Download" in result + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/nonexistent.mkv", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert "Download" in body["error"]["message"] # Partial file should not exist assert not (tmp_path / "nonexistent.mkv").exists() @@ -257,7 +278,7 @@ async def test_download_partial_cleanup(self, mock_client: DsmClient, tmp_path: async def test_download_write_permission_error( self, mock_client: DsmClient, tmp_path: Path ) -> None: - """OSError writing local file should return a formatted error. + """OSError writing local file should raise ToolError. Simulates what happens when the filename contains OS-illegal characters (e.g., ':' on Windows) or the directory is read-only, by attempting to @@ -272,16 +293,20 @@ async def test_download_write_permission_error( headers={"content-type": "application/octet-stream"}, ) - result = await download_file( - mock_client, - path="/video/file.mkv", - dest_folder=str(readonly_dir), - filename="test.mkv", - ) - # Restore permissions for cleanup - readonly_dir.chmod(0o755) - assert "[!]" in result - assert "filename" in result.lower() + try: + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/file.mkv", + dest_folder=str(readonly_dir), + filename="test.mkv", + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "filesystem_error" + finally: + # Restore permissions for cleanup + readonly_dir.chmod(0o755) @respx.mock async def test_download_insufficient_disk_space_preflight( @@ -310,13 +335,71 @@ async def test_download_insufficient_disk_space_preflight( ] ) - result = await download_file( - mock_client, - path="/video/huge.mkv", - dest_folder=str(tmp_path), - ) - assert "[!]" in result - assert "disk space" in result.lower() + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/huge.mkv", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "disk_full" + + async def test_download_enospc_reports_disk_full( + self, mock_client: DsmClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """OSError with errno=ENOSPC during write emits disk_full, not filesystem_error. + + The pre-flight branch catches most disk-full cases up front using + ``shutil.disk_usage``. The OSError fallback covers races (disk + filled between the pre-flight and the actual write) or cases + where the pre-flight was skipped because getinfo failed. Both + branches must emit the same code so smart clients dispatch + consistently — this test exercises the fallback path. + """ + + async def _raise_enospc(*args: object, **kwargs: object) -> int: + raise OSError(errno.ENOSPC, "No space left on device") + + monkeypatch.setattr(mock_client, "download", _raise_enospc) + + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/file.mkv", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "disk_full" + assert body["error"]["retryable"] is True + assert "No space left" in body["error"]["message"] + + async def test_download_non_enospc_oserror_reports_filesystem_error( + self, mock_client: DsmClient, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + """OSError with any errno other than ENOSPC falls through to filesystem_error. + + Complements the ENOSPC test above by exercising the other side + of the errno branch. Uses EACCES (permission denied) rather than + relying on a readonly directory so the test is independent of + filesystem semantics. + """ + + async def _raise_eacces(*args: object, **kwargs: object) -> int: + raise OSError(errno.EACCES, "Permission denied") + + monkeypatch.setattr(mock_client, "download", _raise_eacces) + + with pytest.raises(ToolError) as exc_info: + await download_file( + mock_client, + path="/video/file.mkv", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "filesystem_error" + assert body["error"]["retryable"] is False @respx.mock async def test_download_progress_callback(self, mock_client: DsmClient, tmp_path: Path) -> None: diff --git a/tests/modules/system/__init__.py b/tests/modules/system/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/modules/system/test_info.py b/tests/modules/system/test_info.py new file mode 100644 index 0000000..541130f --- /dev/null +++ b/tests/modules/system/test_info.py @@ -0,0 +1,237 @@ +"""Tests for modules/system/info.py — get_system_info.""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +import httpx +import pytest +import respx +from mcp.server.fastmcp.exceptions import ToolError + +from mcp_synology.core.state import ApiInfoEntry +from mcp_synology.modules.system.info import get_system_info +from tests.conftest import BASE_URL + +if TYPE_CHECKING: + from mcp_synology.core.client import DsmClient + + +def _install_system_apis(client: DsmClient) -> None: + """Add SYNO.DSM.Info and SYNO.Core.System to the client's API cache.""" + client._api_cache["SYNO.DSM.Info"] = ApiInfoEntry( + path="entry.cgi", min_version=1, max_version=3 + ) + client._api_cache["SYNO.Core.System"] = ApiInfoEntry( + path="entry.cgi", min_version=1, max_version=3 + ) + + +def _mock_response(request: httpx.Request, responses: dict[str, dict[str, Any]]) -> httpx.Response: + """Return a canned response keyed on the ``api`` query param.""" + api = dict(request.url.params).get("api", "") + payload = responses.get(api, {"success": False, "error": {"code": 100}}) + return httpx.Response(200, json=payload) + + +class TestGetSystemInfo: + @respx.mock + async def test_success_with_both_sources(self, mock_client: DsmClient) -> None: + """Happy path: DSM.Info + Core.System both return data, output merges them.""" + _install_system_apis(mock_client) + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": { + "model": "DS920+", + "version_string": "DSM 7.2.1-69057", + "ram": 8192, + "temperature": 42, + "uptime": 123456, + "time": "2026-04-10 12:00:00", + }, + }, + "SYNO.Core.System": { + "success": True, + "data": { + "cpu_series": "Intel Celeron J4125", + "cpu_cores": "4", + "cpu_clock_speed": 2000, + "enabled_ntp": True, + "ntp_server": "pool.ntp.org", + }, + }, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + result = await get_system_info(mock_client) + + assert "DS920+" in result + assert "DSM 7.2.1-69057" in result + assert "Intel Celeron J4125" in result + assert "4 cores" in result + assert "2000 MHz" in result + assert "8192 MB" in result + assert "42°C" in result + assert "pool.ntp.org" in result + + @respx.mock + async def test_success_with_temperature_warning(self, mock_client: DsmClient) -> None: + """Temperature warning flag should render the warning marker.""" + _install_system_apis(mock_client) + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": { + "model": "DS920+", + "version_string": "DSM 7.2", + "temperature": 65, + "temperature_warn": True, + }, + }, + "SYNO.Core.System": {"success": True, "data": {}}, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + result = await get_system_info(mock_client) + + assert "65°C" in result + assert "WARNING" in result + + @respx.mock + async def test_success_dsm_only_no_core(self, mock_client: DsmClient) -> None: + """Core.System missing from API cache should still produce output from DSM.Info.""" + mock_client._api_cache["SYNO.DSM.Info"] = ApiInfoEntry( + path="entry.cgi", min_version=1, max_version=3 + ) + # Intentionally do not install SYNO.Core.System — _fetch_core_system_info + # must short-circuit and return {} when the API is absent from the cache. + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": {"model": "DS220j", "version_string": "DSM 7.0"}, + }, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + result = await get_system_info(mock_client) + assert "DS220j" in result + assert "DSM 7.0" in result + + @respx.mock + async def test_both_sources_fail_returns_unavailable(self, mock_client: DsmClient) -> None: + """If both API calls raise, the tool should emit a retryable unavailable error.""" + _install_system_apis(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": False, "error": {"code": 105}} + ) + + with pytest.raises(ToolError) as exc_info: + await get_system_info(mock_client) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "unavailable" + assert body["error"]["retryable"] is True + + @respx.mock + async def test_both_sources_return_empty_data_returns_unavailable( + self, mock_client: DsmClient + ) -> None: + """API calls succeed but populate no fields → unavailable, retryable. + + Hits the first ``error_response`` branch (``not dsm and not core``) + because empty dicts are falsy. + """ + _install_system_apis(mock_client) + responses = { + "SYNO.DSM.Info": {"success": True, "data": {}}, + "SYNO.Core.System": {"success": True, "data": {}}, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + with pytest.raises(ToolError) as exc_info: + await get_system_info(mock_client) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "unavailable" + assert body["error"]["retryable"] is True + + @respx.mock + async def test_unrecognized_fields_produce_no_pairs(self, mock_client: DsmClient) -> None: + """Sources return non-empty dicts but with fields we don't extract. + + This is the SECOND ``if not pairs: error_response(...)`` branch — + distinct from the empty-dict case above. The dicts are truthy so + the first branch is skipped, but since none of the expected + fields (model, version_string, ram, cpu_series, etc.) are + populated, ``pairs`` stays empty and the late-branch + ``unavailable`` fires at the bottom of the function. + """ + _install_system_apis(mock_client) + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": {"some_unknown_field": "value", "another_unknown": 42}, + }, + "SYNO.Core.System": { + "success": True, + "data": {"yet_another_field": "also unknown"}, + }, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + with pytest.raises(ToolError) as exc_info: + await get_system_info(mock_client) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "unavailable" + assert body["error"]["retryable"] is True + assert "No system information returned" in body["error"]["message"] + + @respx.mock + async def test_uptime_formatting(self, mock_client: DsmClient) -> None: + """Uptime seconds should be formatted as days/hours/minutes.""" + _install_system_apis(mock_client) + # 2 days, 3 hours, 15 minutes = 2*86400 + 3*3600 + 15*60 = 184500 + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": {"model": "DS920+", "uptime": 184500}, + }, + "SYNO.Core.System": {"success": True, "data": {}}, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + result = await get_system_info(mock_client) + assert "2d" in result + assert "3h" in result + assert "15m" in result + + @respx.mock + async def test_uptime_under_one_minute(self, mock_client: DsmClient) -> None: + """Uptime of less than 60 seconds renders as '< 1m'.""" + _install_system_apis(mock_client) + responses = { + "SYNO.DSM.Info": { + "success": True, + "data": {"model": "DS920+", "uptime": 30}, + }, + "SYNO.Core.System": {"success": True, "data": {}}, + } + respx.get(f"{BASE_URL}/webapi/entry.cgi").mock( + side_effect=lambda req: _mock_response(req, responses) + ) + + result = await get_system_info(mock_client) + assert "< 1m" in result diff --git a/tests/modules/system/test_utilization.py b/tests/modules/system/test_utilization.py new file mode 100644 index 0000000..905593b --- /dev/null +++ b/tests/modules/system/test_utilization.py @@ -0,0 +1,237 @@ +"""Tests for modules/system/utilization.py — get_resource_usage.""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +import pytest +import respx +from mcp.server.fastmcp.exceptions import ToolError + +from mcp_synology.core.state import ApiInfoEntry +from mcp_synology.modules.system.utilization import get_resource_usage +from tests.conftest import BASE_URL + +if TYPE_CHECKING: + from mcp_synology.core.client import DsmClient + + +def _install_utilization_api(client: DsmClient) -> None: + client._api_cache["SYNO.Core.System.Utilization"] = ApiInfoEntry( + path="entry.cgi", min_version=1, max_version=3 + ) + + +class TestGetResourceUsage: + @respx.mock + async def test_success_full_payload(self, mock_client: DsmClient) -> None: + """Happy path: CPU + memory + network + disk all populated.""" + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={ + "success": True, + "data": { + "cpu": { + "user_load": 12, + "system_load": 5, + "other_load": 2, + "1min_load": 150, + "5min_load": 120, + "15min_load": 100, + }, + "memory": { + "memory_size": 8388608, + "real_usage": 35, + "avail_real": 5452595, + "swap_usage": 2, + "total_swap": 2097152, + }, + "network": [ + {"device": "total", "rx": 1024000, "tx": 512000}, + {"device": "eth0", "rx": 512000, "tx": 256000}, + ], + "disk": { + "disk": [ + { + "device": "sda", + "utilization": 15, + "read_byte": 1024, + "write_byte": 2048, + } + ] + }, + }, + } + ) + + result = await get_resource_usage(mock_client) + assert "Resource Usage" in result + assert "eth0" in result + assert "sda" in result + + @respx.mock + async def test_cpu_other_load_fallback_format(self, mock_client: DsmClient) -> None: + """Alternate CPU format: no 15min_load, no system/user_load, just other_load. + + Covers the ``elif "other_load" in cpu:`` branch — some DSM versions + report CPU purely as ``other_load`` (e.g., single-percent roll-up) + rather than the system/user split or the load-average block. + """ + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={ + "success": True, + "data": { + "cpu": {"other_load": 42}, + "memory": {"real_usage": 10}, + "disk": {"disk": []}, + }, + } + ) + result = await get_resource_usage(mock_client) + assert "CPU usage" in result + assert "42%" in result + + @respx.mock + async def test_memory_cached_and_swap_detail(self, mock_client: DsmClient) -> None: + """Memory payload with cached bytes and swap-in activity. + + Covers the ``if cached:`` and ``if swap_used:`` branches in + ``_format_memory``. Both are conditional additions to the memory + output that only trigger when the NAS reports non-zero values. + """ + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={ + "success": True, + "data": { + "cpu": {"user_load": 5, "system_load": 2, "other_load": 0}, + "memory": { + "memory_size": 8388608, # 8 GB in KB + "real_usage": 40, + "avail_real": 5242880, # ~5 GB + "cached": 1048576, # 1 GB cached + "si_disk": 25, # active swap-in + }, + "disk": {"disk": []}, + }, + } + ) + result = await get_resource_usage(mock_client) + assert "1024 MB cached" in result + assert "Swap in" in result + assert "25 pages/s" in result + + @respx.mock + async def test_disk_unexpected_type_falls_through_to_empty_list( + self, mock_client: DsmClient + ) -> None: + """Disk as neither dict nor list → treated as empty, no disk rows. + + Covers the ``else: disk_list = []`` fallback in get_resource_usage. + Should not raise — just render without any disk entries. Uses a + CPU/memory payload so ``pairs`` is non-empty and we don't trip + the ``unavailable`` branch. + """ + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={ + "success": True, + "data": { + "cpu": {"user_load": 5, "system_load": 2}, + "memory": {"real_usage": 30}, + # Neither dict with 'disk' key nor a bare list — e.g., + # DSM returned a string sentinel or null-like value. + "disk": "unavailable", + }, + } + ) + result = await get_resource_usage(mock_client) + assert "Resource Usage" in result + assert "CPU usage" in result + # No disk entries rendered + assert "Disk (" not in result + + @respx.mock + async def test_success_disk_as_list(self, mock_client: DsmClient) -> None: + """DSM sometimes returns disk as a bare list instead of {'disk': [...]}.""" + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={ + "success": True, + "data": { + "cpu": {"user_load": 10, "system_load": 5, "other_load": 0}, + "memory": {"memory_size": 8388608, "real_usage": 30}, + "disk": [ + { + "device": "sdb", + "utilization": 25, + "read_byte": 0, + "write_byte": 0, + } + ], + }, + } + ) + result = await get_resource_usage(mock_client) + assert "sdb" in result + + async def test_api_not_in_cache_returns_api_not_found(self, mock_client: DsmClient) -> None: + """If SYNO.Core.System.Utilization isn't cached, report api_not_found.""" + # Deliberately do NOT install the API + with pytest.raises(ToolError) as exc_info: + await get_resource_usage(mock_client) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + assert body["error"]["code"] == "api_not_found" + assert body["error"]["retryable"] is False + + @respx.mock + async def test_dsm_105_maps_to_permission_denied(self, mock_client: DsmClient) -> None: + """DSM code 105 on this API means the account isn't admin — not a session issue.""" + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": False, "error": {"code": 105}} + ) + + with pytest.raises(ToolError) as exc_info: + await get_resource_usage(mock_client) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "permission_denied" + assert body["error"]["retryable"] is False + assert "admin" in body["error"]["suggestion"].lower() + + @respx.mock + async def test_other_dsm_error_propagates_as_synology_error( + self, mock_client: DsmClient + ) -> None: + """Non-105 DSM errors fall through to synology_error_response.""" + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond( + json={"success": False, "error": {"code": 100}} + ) + + with pytest.raises(ToolError) as exc_info: + await get_resource_usage(mock_client) + body = json.loads(str(exc_info.value)) + # Code 100 is unknown/generic — routes to dsm_error + assert body["error"]["code"] == "dsm_error" + + @respx.mock + async def test_empty_payload_returns_unavailable_retryable( + self, mock_client: DsmClient + ) -> None: + """API returns success but no metrics → unavailable/retryable=True. + + This is the finding-2 fix: previously this call site emitted + retryable=False, contradicting info.py and the PR body table. + """ + _install_utilization_api(mock_client) + respx.get(f"{BASE_URL}/webapi/entry.cgi").respond(json={"success": True, "data": {}}) + + with pytest.raises(ToolError) as exc_info: + await get_resource_usage(mock_client) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "unavailable" + assert body["error"]["retryable"] is True diff --git a/tests/test_integration.py b/tests/test_integration.py index 54db24b..c8281b0 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -14,12 +14,14 @@ from __future__ import annotations import asyncio +import json import logging from pathlib import Path from typing import Any import pytest import yaml +from mcp.server.fastmcp.exceptions import ToolError from mcp_synology.core.auth import AuthManager from mcp_synology.core.client import DsmClient @@ -259,24 +261,24 @@ async def test_list_shares(self, nas_client: Any) -> None: client, _, _, _ = _unpack(nas_client) result = await list_shares(client) assert "Name" in result # table header - assert "[!]" not in result logger.info("list_shares output:\n%s", result) async def test_list_files_existing_share(self, nas_client: Any) -> None: """Should list files in a known share.""" client, _, _, paths = _unpack(nas_client) result = await list_files(client, path=paths["existing_share"]) - assert "[!]" not in result logger.info("list_files(%s):\n%s", paths["existing_share"], result) async def test_list_files_root(self, nas_client: Any) -> None: """Listing '/' may fail on some DSM versions — verify graceful handling.""" client, _, _, _ = _unpack(nas_client) - result = await list_files(client, path="/") - logger.info("list_files(/):\n%s", result) # On some DSM versions, listing '/' via FileStation.List fails (error 401). - # Use list_shares instead. Here we just verify it doesn't crash. - assert isinstance(result, str) + # Use list_shares instead. Here we just verify it doesn't crash hard. + try: + result = await list_files(client, path="/") + logger.info("list_files(/):\n%s", result) + except ToolError as e: + logger.info("list_files(/) raised ToolError (expected on some DSM): %s", e) async def test_list_files_sorted_by_size(self, nas_client: Any) -> None: """List files sorted by size descending.""" @@ -284,22 +286,22 @@ async def test_list_files_sorted_by_size(self, nas_client: Any) -> None: result = await list_files( client, path=paths["existing_share"], sort_by="size", sort_direction="desc" ) - assert "[!]" not in result logger.info("list_files sorted by size:\n%s", result) async def test_list_files_with_limit(self, nas_client: Any) -> None: """List files with a small limit to test pagination.""" client, _, _, paths = _unpack(nas_client) result = await list_files(client, path=paths["existing_share"], limit=3) - assert "[!]" not in result logger.info("list_files limit=3:\n%s", result) async def test_list_files_invalid_path(self, nas_client: Any) -> None: - """Listing a non-existent path should return a formatted error.""" + """Listing a non-existent path should raise ToolError.""" client, _, _, _ = _unpack(nas_client) - result = await list_files(client, path="/zzz_nonexistent_share_999") - assert "[!]" in result - logger.info("list_files(invalid):\n%s", result) + with pytest.raises(ToolError) as exc_info: + await list_files(client, path="/zzz_nonexistent_share_999") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + logger.info("list_files(invalid) raised ToolError: %s", exc_info.value) # --------------------------------------------------------------------------- @@ -338,7 +340,6 @@ async def test_search_keyword_finds_directory(self, nas_client: Any) -> None: result = await search_files(client, folder_path=folder, pattern=keyword) logger.info("search_files(%s, pattern=%s):\n%s", folder, keyword, result) - assert "[!]" not in result assert "0 results found" not in result, ( f"Search for '{keyword}' in {folder} returned 0 results. " "Verify the search_keyword and search_folder in integration_config.yaml. " @@ -351,7 +352,6 @@ async def test_search_by_extension(self, nas_client: Any) -> None: await asyncio.sleep(2) result = await search_files(client, folder_path=paths["existing_share"], pattern="*.stl") logger.info("Extension search (*.stl):\n%s", result) - assert "[!]" not in result async def test_search_no_results(self, nas_client: Any) -> None: """Search for a nonsense pattern should return 0 results, not an error.""" @@ -361,16 +361,17 @@ async def test_search_no_results(self, nas_client: Any) -> None: client, folder_path=paths["existing_share"], pattern="zzz_nonexistent_xyzzy_999" ) assert "0 results found" in result - assert "[!]" not in result async def test_search_from_root_error_handling(self, nas_client: Any) -> None: """Searching from '/' may fail — verify we handle it gracefully.""" client, _, _, _ = _unpack(nas_client) await asyncio.sleep(2) - result = await search_files(client, folder_path="/", pattern="test") - logger.info("Root search result:\n%s", result) - # Whether it returns results or an error, it should not crash. - assert isinstance(result, str) + # Whether it returns results or raises ToolError, it should not crash hard. + try: + result = await search_files(client, folder_path="/", pattern="test") + logger.info("Root search result:\n%s", result) + except ToolError as e: + logger.info("Root search raised ToolError (expected on some DSM): %s", e) # --------------------------------------------------------------------------- @@ -386,7 +387,6 @@ async def test_get_file_info_share(self, nas_client: Any) -> None: """Get info about a known share folder.""" client, _, _, paths = _unpack(nas_client) result = await get_file_info(client, paths=[paths["existing_share"]]) - assert "[!]" not in result logger.info("get_file_info(%s):\n%s", paths["existing_share"], result) async def test_get_file_info_multiple_paths(self, nas_client: Any) -> None: @@ -396,19 +396,20 @@ async def test_get_file_info_multiple_paths(self, nas_client: Any) -> None: client, paths=[paths["existing_share"], paths["writable_folder"]], ) - assert "[!]" not in result logger.info("get_file_info(multiple):\n%s", result) async def test_get_file_info_invalid_path(self, nas_client: Any) -> None: """Get info about a non-existent path — should not crash. - DSM may return success with empty metadata or an error depending + DSM may return success with empty metadata or raise ToolError depending on the path format. We just verify graceful handling. """ client, _, _, _ = _unpack(nas_client) - result = await get_file_info(client, paths=["/zzz_nonexistent_999/fake.txt"]) - assert isinstance(result, str) - logger.info("get_file_info(invalid):\n%s", result) + try: + result = await get_file_info(client, paths=["/zzz_nonexistent_999/fake.txt"]) + logger.info("get_file_info(invalid):\n%s", result) + except ToolError as e: + logger.info("get_file_info(invalid) raised ToolError: %s", e) async def test_get_dir_size(self, nas_client: Any) -> None: """Get size of a known folder (uses a smaller folder to avoid timeouts).""" @@ -416,7 +417,6 @@ async def test_get_dir_size(self, nas_client: Any) -> None: # Use the writable folder (likely small) rather than the existing_share # which may be very large and cause the background task to time out. result = await get_dir_size(client, path=paths["writable_folder"]) - assert "[!]" not in result assert "Total size" in result logger.info("get_dir_size(%s):\n%s", paths["writable_folder"], result) @@ -451,9 +451,12 @@ async def test_01_create_folder(self, nas_client: Any) -> None: base = paths["writable_folder"] # Create main test dir and a subfolder to use as copy source - result = await create_folder(client, paths=[f"{base}/{self._COPY_SRC}"]) - logger.info("create_folder result:\n%s", result) - assert "[!]" not in result or "already exists" in result.lower() + try: + result = await create_folder(client, paths=[f"{base}/{self._COPY_SRC}"]) + logger.info("create_folder result:\n%s", result) + except ToolError as e: + body = json.loads(str(e)) + assert body["error"]["code"] == "already_exists", f"Unexpected error: {e}" async def test_02_create_folder_idempotent(self, nas_client: Any) -> None: """Creating the same folder again should not error (idempotent).""" @@ -480,7 +483,6 @@ async def test_03_copy_folder(self, nas_client: Any) -> None: result = await copy_files(client, paths=[src], dest_folder=dest) logger.info("copy_files result:\n%s", result) - assert "[!]" not in result assert "Copied" in result async def test_04_verify_copy_exists(self, nas_client: Any) -> None: @@ -504,7 +506,6 @@ async def test_05_copy_with_overwrite(self, nas_client: Any) -> None: result = await copy_files(client, paths=[src], dest_folder=dest, overwrite=True) logger.info("copy_files (overwrite):\n%s", result) - assert "[!]" not in result async def test_06_move_folder(self, nas_client: Any) -> None: """Move the copied folder to a new name.""" @@ -521,7 +522,6 @@ async def test_06_move_folder(self, nas_client: Any) -> None: result = await move_files(client, paths=[src], dest_folder=dest) logger.info("move_files result:\n%s", result) - assert "[!]" not in result assert "Moved" in result async def test_07_verify_move(self, nas_client: Any) -> None: @@ -551,7 +551,6 @@ async def test_08_rename(self, nas_client: Any) -> None: target = f"{base}/{self._MOVE_DEST}/original" result = await rename(client, path=target, new_name="renamed_test") logger.info("rename result:\n%s", result) - assert "[!]" not in result async def test_09_delete_cleanup(self, nas_client: Any) -> None: """Delete the entire test folder tree (cleanup).""" @@ -562,7 +561,6 @@ async def test_09_delete_cleanup(self, nas_client: Any) -> None: target = f"{base}/{self._TEST_DIR}" result = await delete_files(client, paths=[target], recursive=True) logger.info("delete result:\n%s", result) - assert "[!]" not in result async def test_10_verify_deleted(self, nas_client: Any) -> None: """Verify the test folder is gone from the writable area.""" @@ -598,13 +596,15 @@ async def test_01_create_and_delete(self, nas_client: Any) -> None: folder = f"{base}/{self._RECYCLE_TEST}" # Create - result = await create_folder(client, paths=[folder]) - assert "[!]" not in result or "already exists" in result.lower() + try: + result = await create_folder(client, paths=[folder]) + except ToolError as e: + body = json.loads(str(e)) + assert body["error"]["code"] == "already_exists", f"Unexpected error: {e}" # Delete (should go to recycle bin if enabled) result = await delete_files(client, paths=[folder]) logger.info("delete result:\n%s", result) - assert "[!]" not in result async def test_02_list_recycle_bin(self, nas_client: Any) -> None: """List the recycle bin for the writable folder's share.""" @@ -675,15 +675,17 @@ async def test_02_upload_duplicate_no_overwrite(self, nas_client: Any, tmp_path: local_file = tmp_path / "upload_test.txt" local_file.write_bytes(self._TEST_CONTENT) - result = await upload_file( - client, - local_path=str(local_file), - dest_folder=dest, - ) - logger.info("upload duplicate (no overwrite):\n%s", result) - # DSM may return success (silent overwrite) or error (already exists). - # Either is acceptable — we just verify it doesn't crash. - assert isinstance(result, str) + # DSM may return success (silent overwrite) or raise ToolError (already exists). + # Either is acceptable — we just verify it doesn't crash hard. + try: + result = await upload_file( + client, + local_path=str(local_file), + dest_folder=dest, + ) + logger.info("upload duplicate (no overwrite):\n%s", result) + except ToolError as e: + logger.info("upload duplicate raised ToolError (expected): %s", e) async def test_03_upload_overwrite(self, nas_client: Any, tmp_path: Path) -> None: """Uploading with overwrite=True should succeed.""" @@ -790,26 +792,29 @@ async def test_08_download_no_overwrite(self, nas_client: Any, tmp_path: Path) - existing = tmp_path / "upload_test.txt" existing.write_text("existing local content") - result = await download_file( - client, - path=nas_path, - dest_folder=str(tmp_path), - ) - logger.info("download no overwrite:\n%s", result) - assert "[!]" in result - assert "already exists" in result + with pytest.raises(ToolError) as exc_info: + await download_file( + client, + path=nas_path, + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["error"]["code"] == "already_exists" + logger.info("download no overwrite raised ToolError: %s", exc_info.value) async def test_09_download_nonexistent(self, nas_client: Any, tmp_path: Path) -> None: - """Download a non-existent NAS file should return formatted error.""" + """Download a non-existent NAS file should raise ToolError.""" client, _, _, _ = _unpack(nas_client) - result = await download_file( - client, - path="/zzz_nonexistent_999/fake.txt", - dest_folder=str(tmp_path), - ) - logger.info("download nonexistent:\n%s", result) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await download_file( + client, + path="/zzz_nonexistent_999/fake.txt", + dest_folder=str(tmp_path), + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + logger.info("download nonexistent raised ToolError: %s", exc_info.value) async def test_10_cleanup(self, nas_client: Any) -> None: """Delete the test upload directory from the NAS.""" @@ -820,7 +825,6 @@ async def test_10_cleanup(self, nas_client: Any) -> None: target = f"{base}/{self._UPLOAD_DIR}" result = await delete_files(client, paths=[target], recursive=True) logger.info("transfer cleanup:\n%s", result) - assert "[!]" not in result # --------------------------------------------------------------------------- @@ -833,35 +837,41 @@ class TestErrorHandling: """Test that errors are handled gracefully, not crashes.""" async def test_copy_invalid_source(self, nas_client: Any) -> None: - """Copy from a non-existent path should return formatted error.""" + """Copy from a non-existent path should raise ToolError.""" client, _, config, paths = _unpack(nas_client) _skip_unless_write(config) - result = await copy_files( - client, - paths=["/zzz_nonexistent_999/fake.txt"], - dest_folder=paths["writable_folder"], - ) - logger.info("copy invalid source:\n%s", result) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await copy_files( + client, + paths=["/zzz_nonexistent_999/fake.txt"], + dest_folder=paths["writable_folder"], + ) + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + logger.info("copy invalid source raised ToolError: %s", exc_info.value) async def test_delete_invalid_path(self, nas_client: Any) -> None: - """Delete a non-existent path should return formatted error.""" + """Delete a non-existent path should raise ToolError or succeed silently.""" client, _, config, paths = _unpack(nas_client) _skip_unless_write(config) - result = await delete_files(client, paths=["/zzz_nonexistent_999/fake.txt"]) - logger.info("delete invalid path:\n%s", result) # May succeed silently (DSM doesn't always error on missing paths) - # or return an error. Either way, should not crash. - assert isinstance(result, str) + # or raise ToolError. Either way, should not crash. + try: + result = await delete_files(client, paths=["/zzz_nonexistent_999/fake.txt"]) + logger.info("delete invalid path:\n%s", result) + except ToolError as e: + logger.info("delete invalid path raised ToolError: %s", e) async def test_rename_invalid_path(self, nas_client: Any) -> None: - """Rename a non-existent path should return formatted error.""" + """Rename a non-existent path should raise ToolError.""" client, _, _, _ = _unpack(nas_client) - result = await rename(client, path="/zzz_nonexistent_999/fake.txt", new_name="new_name") - logger.info("rename invalid path:\n%s", result) - assert "[!]" in result + with pytest.raises(ToolError) as exc_info: + await rename(client, path="/zzz_nonexistent_999/fake.txt", new_name="new_name") + body = json.loads(str(exc_info.value)) + assert body["status"] == "error" + logger.info("rename invalid path raised ToolError: %s", exc_info.value) # --------------------------------------------------------------------------- @@ -878,7 +888,6 @@ async def test_get_system_info(self, nas_client: Any) -> None: client, _, _, _ = _unpack(nas_client) result = await get_system_info(client) logger.info("get_system_info:\n%s", result) - assert "[!]" not in result assert "Model" in result assert "Firmware" in result assert "Temperature" in result @@ -903,19 +912,22 @@ class TestResourceUsage: async def test_resource_usage_non_admin(self, nas_client: Any) -> None: """Non-admin user should get a clear permission error.""" client, _, _, _ = _unpack(nas_client) - result = await get_resource_usage(client) - logger.info("get_resource_usage (non-admin):\n%s", result) # Should fail with permission error for non-admin users - if "[!]" in result: - assert "admin" in result.lower() or "permission" in result.lower() - # If it succeeds, the user IS admin — that's fine too + # or succeed if the user IS admin. + try: + result = await get_resource_usage(client) + logger.info("get_resource_usage (non-admin succeeded — user is admin):\n%s", result) + except ToolError as e: + body = json.loads(str(e)) + logger.info("get_resource_usage (non-admin) raised ToolError: %s", e) + msg = body["error"]["message"].lower() + assert "admin" in msg or "permission" in msg async def test_resource_usage_admin(self, admin_client: Any) -> None: """Admin user should get real utilization data.""" client, _, _, _ = _unpack(admin_client) result = await get_resource_usage(client) logger.info("get_resource_usage (admin):\n%s", result) - assert "[!]" not in result assert "CPU" in result assert "Memory" in result @@ -931,7 +943,6 @@ async def test_utilization_before_and_during_load(self, admin_client: Any) -> No # Baseline reading — verify NAS isn't already overloaded baseline = await get_resource_usage(client) logger.info("Baseline utilization:\n%s", baseline) - assert "[!]" not in baseline, "Baseline should succeed with admin" assert "CPU" in baseline, "Baseline should include CPU data" # Start a heavy operation concurrently @@ -948,6 +959,5 @@ async def test_utilization_before_and_during_load(self, admin_client: Any) -> No await dir_task # Both readings should be valid - assert "[!]" not in during_load assert "CPU" in during_load logger.info("Utilization test passed — both readings returned valid data")