diff --git a/CLAUDE.md b/CLAUDE.md index 530bb71a2a..e6dbbea9cf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -151,7 +151,7 @@ web/ # Vue 3 + PrimeVue + Tailwind CSS dashboard - **Every module** with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `TOOL_OUTPUT_WITHHELD` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/operations.md b/docs/design/operations.md index bea6f225e0..8404cf1e6d 100644 --- a/docs/design/operations.md +++ b/docs/design/operations.md @@ -787,14 +787,23 @@ execution. Post-tool-call scanning detects sensitive data in outputs. ### Output Scan Response Policies After the output scanner detects sensitive data, a pluggable `OutputScanResponsePolicy` -protocol decides how to handle the findings: - -| Policy | Behavior | Default for | -|--------|----------|-------------| -| **Redact** (default) | Return scanner's redacted content as-is | `SEMI`, `SUPERVISED` autonomy | -| **Withhold** | Clear redacted content -- fail-closed, no partial data returned | `LOCKED` autonomy | -| **Log-only** | Discard findings (logs at WARNING), pass original output through | `FULL` autonomy | -| **Autonomy-tiered** | Delegate to a sub-policy based on effective autonomy level | Composite policy | +protocol decides how to handle the findings. Each policy sets a `ScanOutcome` enum on the +returned `OutputScanResult` so downstream consumers (primarily `ToolInvoker`) can +distinguish intentional policy decisions from scanner failures: + +| Policy | Behavior | `ScanOutcome` | Default for | +|--------|----------|---------------|-------------| +| **Redact** (default) | Return scanner's redacted content as-is | `REDACTED` | `SEMI`, `SUPERVISED` autonomy | +| **Withhold** | Clear redacted content — content withheld by policy | `WITHHELD` | `LOCKED` autonomy | +| **Log-only** | Discard findings (logs at WARNING), pass original output through | `LOG_ONLY` | `FULL` autonomy | +| **Autonomy-tiered** | Delegate to a sub-policy based on effective autonomy level | *(set by delegate)* | Composite policy | + +The `ScanOutcome` enum (`CLEAN`, `REDACTED`, `WITHHELD`, `LOG_ONLY`) is set by the scanner +(initial `REDACTED` when findings are detected) and may be transformed by the policy (e.g. +`WithholdPolicy` changes `REDACTED` → `WITHHELD`). The `ToolInvoker._scan_output` method +branches on `ScanOutcome.WITHHELD` first to return a dedicated error message ("content +withheld by security policy") with `output_withheld` metadata — distinct from the generic +fail-closed path used for scanner exceptions. Policy selection is declarative via `SecurityConfig.output_scan_policy_type` (`OutputScanPolicyType` enum). A factory function (`build_output_scan_policy`) resolves the diff --git a/src/ai_company/observability/events/tool.py b/src/ai_company/observability/events/tool.py index e5082281d0..aa980ce30c 100644 --- a/src/ai_company/observability/events/tool.py +++ b/src/ai_company/observability/events/tool.py @@ -44,6 +44,7 @@ TOOL_SECURITY_DENIED: Final[str] = "tool.security.denied" TOOL_SECURITY_ESCALATED: Final[str] = "tool.security.escalated" TOOL_OUTPUT_REDACTED: Final[str] = "tool.output.redacted" +TOOL_OUTPUT_WITHHELD: Final[str] = "tool.output.withheld" # ── Subprocess utility events ─────────────────────────────────── TOOL_SUBPROCESS_TRANSPORT_CLOSE_FAILED: Final[str] = ( diff --git a/src/ai_company/security/__init__.py b/src/ai_company/security/__init__.py index df3ce3e383..14005305df 100644 --- a/src/ai_company/security/__init__.py +++ b/src/ai_company/security/__init__.py @@ -7,7 +7,8 @@ - ``SecurityVerdict`` / ``SecurityVerdictType`` — evaluation results. - ``SecurityContext`` — tool invocation context for evaluation. - ``AuditEntry`` / ``AuditLog`` — audit recording. -- ``OutputScanResult`` / ``OutputScanner`` — post-tool output scanning. +- ``OutputScanResult`` / ``ScanOutcome`` / ``OutputScanner`` + — post-tool output scanning. - ``OutputScanResponsePolicy`` — protocol for output scan policies. - ``RedactPolicy`` / ``WithholdPolicy`` / ``LogOnlyPolicy`` / ``AutonomyTieredPolicy`` — policy implementations. @@ -32,6 +33,7 @@ from ai_company.security.models import ( AuditEntry, OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -66,6 +68,7 @@ "RedactPolicy", "RuleEngine", "RuleEngineConfig", + "ScanOutcome", "SecOpsService", "SecurityConfig", "SecurityContext", diff --git a/src/ai_company/security/models.py b/src/ai_company/security/models.py index 79b914a05a..4e1fab6150 100644 --- a/src/ai_company/security/models.py +++ b/src/ai_company/security/models.py @@ -20,6 +20,30 @@ from ai_company.core.types import NotBlankStr # noqa: TC001 +class ScanOutcome(StrEnum): + """Outcome of an output scan policy decision. + + Tracks what the scanner/policy *did* with the output so that + downstream consumers (e.g. ``ToolInvoker``) can distinguish + intentional withholding from scanner failure. + + Attributes: + CLEAN: No sensitive data detected (default). + REDACTED: Sensitive data found, redacted content available. + WITHHELD: Content intentionally withheld by policy. + LOG_ONLY: Findings discarded by policy, original content passed + through. Always emitted with ``has_sensitive_data=False`` + because the policy resets the result — the audit log + (written by ``SecOpsService`` before the policy runs) is + the source of truth for what was actually detected. + """ + + CLEAN = "clean" + REDACTED = "redacted" + WITHHELD = "withheld" + LOG_ONLY = "log_only" + + class SecurityVerdictType(StrEnum): """Security verdict constants. @@ -156,6 +180,9 @@ class OutputScanResult(BaseModel): has_sensitive_data: Whether sensitive data was detected. findings: Descriptions of findings. redacted_content: Content with sensitive data replaced, or None. + outcome: What the scanner/policy did with the output. + Allows downstream consumers to distinguish intentional + withholding from scanner failure. """ model_config = ConfigDict(frozen=True) @@ -163,6 +190,7 @@ class OutputScanResult(BaseModel): has_sensitive_data: bool = False findings: tuple[NotBlankStr, ...] = () redacted_content: str | None = None + outcome: ScanOutcome = ScanOutcome.CLEAN @model_validator(mode="after") def _check_consistency(self) -> OutputScanResult: @@ -174,4 +202,25 @@ def _check_consistency(self) -> OutputScanResult: if self.redacted_content is not None: msg = "redacted_content must be None when has_sensitive_data is False" raise ValueError(msg) + if self.outcome in (ScanOutcome.REDACTED, ScanOutcome.WITHHELD): + msg = ( + f"outcome={self.outcome.value!r} is invalid when " + "has_sensitive_data is False" + ) + raise ValueError(msg) + elif self.outcome == ScanOutcome.CLEAN: + msg = "outcome='clean' is invalid when has_sensitive_data is True" + raise ValueError(msg) + elif not self.findings: + msg = "findings must not be empty when has_sensitive_data is True" + raise ValueError(msg) + if self.outcome == ScanOutcome.REDACTED and self.redacted_content is None: + msg = "redacted_content must not be None when outcome is 'redacted'" + raise ValueError(msg) + if self.outcome == ScanOutcome.WITHHELD and self.redacted_content is not None: + msg = "redacted_content must be None when outcome is 'withheld'" + raise ValueError(msg) + if self.outcome == ScanOutcome.LOG_ONLY and self.has_sensitive_data: + msg = "outcome='log_only' is invalid when has_sensitive_data is True" + raise ValueError(msg) return self diff --git a/src/ai_company/security/output_scan_policy.py b/src/ai_company/security/output_scan_policy.py index 11e3baca2e..3014d6bb7a 100644 --- a/src/ai_company/security/output_scan_policy.py +++ b/src/ai_company/security/output_scan_policy.py @@ -14,7 +14,7 @@ from ai_company.observability.events.security import ( SECURITY_OUTPUT_SCAN_POLICY_APPLIED, ) -from ai_company.security.models import OutputScanResult +from ai_company.security.models import OutputScanResult, ScanOutcome if TYPE_CHECKING: from collections.abc import Mapping @@ -95,7 +95,9 @@ def apply( class WithholdPolicy: """Clear redacted content when sensitive data is found. - Forces fail-closed in the invoker — no partial data is returned. + Sets ``ScanOutcome.WITHHELD`` so the invoker returns a dedicated + "withheld by policy" error — no partial data is returned. This + is distinct from the fail-closed path used for scanner errors. The ``findings`` tuple is deliberately preserved so that audit consumers can categorise what was detected without seeing the actual content. @@ -127,7 +129,9 @@ def apply( ) if not scan_result.has_sensitive_data: return scan_result - return scan_result.model_copy(update={"redacted_content": None}) + return scan_result.model_copy( + update={"redacted_content": None, "outcome": ScanOutcome.WITHHELD}, + ) class LogOnlyPolicy: @@ -172,12 +176,12 @@ def apply( agent_id=context.agent_id, note="Sensitive data detected but passed through by log_only policy", ) - else: - logger.debug( - SECURITY_OUTPUT_SCAN_POLICY_APPLIED, - policy="log_only", - has_sensitive_data=False, - ) + return OutputScanResult(outcome=ScanOutcome.LOG_ONLY) + logger.debug( + SECURITY_OUTPUT_SCAN_POLICY_APPLIED, + policy="log_only", + has_sensitive_data=False, + ) return OutputScanResult() diff --git a/src/ai_company/security/output_scanner.py b/src/ai_company/security/output_scanner.py index 0cf4797bcf..64523cf841 100644 --- a/src/ai_company/security/output_scanner.py +++ b/src/ai_company/security/output_scanner.py @@ -13,7 +13,7 @@ SECURITY_OUTPUT_SCAN_FINDING, SECURITY_OUTPUT_SCAN_START, ) -from ai_company.security.models import OutputScanResult +from ai_company.security.models import OutputScanResult, ScanOutcome from ai_company.security.rules.credential_detector import CREDENTIAL_PATTERNS from ai_company.security.rules.data_leak_detector import PII_PATTERNS @@ -67,4 +67,5 @@ def scan(self, output: str) -> OutputScanResult: has_sensitive_data=True, findings=tuple(sorted(set(findings))), redacted_content=redacted, + outcome=ScanOutcome.REDACTED, ) diff --git a/src/ai_company/security/service.py b/src/ai_company/security/service.py index 138e78301b..efe4a5fb12 100644 --- a/src/ai_company/security/service.py +++ b/src/ai_company/security/service.py @@ -284,8 +284,10 @@ async def scan_output( SECURITY_INTERCEPTOR_ERROR, tool_name=context.tool_name, policy=policy_name, + fallback_outcome=result.outcome.value, note="Output scan policy application failed " - "— returning raw scan result", + "— returning raw scan result " + "(may be less strict than intended policy)", ) return result diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index e9ab6b27b0..7a3388a130 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -35,7 +35,6 @@ TOOL_INVOKE_SUCCESS, TOOL_INVOKE_TOOL_ERROR, TOOL_INVOKE_VALIDATION_UNEXPECTED, - TOOL_OUTPUT_REDACTED, TOOL_PERMISSION_DENIED, TOOL_SECURITY_DENIED, TOOL_SECURITY_ESCALATED, @@ -45,6 +44,7 @@ from .base import ToolExecutionResult from .errors import ToolExecutionError, ToolNotFoundError, ToolParameterError +from .scan_result_handler import handle_sensitive_scan if TYPE_CHECKING: from collections.abc import Iterable @@ -279,10 +279,14 @@ async def _scan_output( ) -> ToolExecutionResult: """Scan tool output for sensitive data (if interceptor is set). - If sensitive data is found and redacted, returns a new - ``ToolExecutionResult`` with the redacted content. Exceptions - from the scanner are caught — the original result is returned - to avoid destroying valid tool output. + When sensitive data is detected (``has_sensitive_data=True``), + delegates to ``handle_sensitive_scan`` which branches on + ``outcome`` (``WITHHELD`` vs ``REDACTED``). When no sensitive + data is detected (including ``LOG_ONLY`` and ``CLEAN`` + outcomes), the original output passes through unchanged. + + Scanner exceptions are caught and fail-closed — a generic error + result is returned to prevent leaking sensitive data. """ if self._security_interceptor is None: return result @@ -301,43 +305,13 @@ async def _scan_output( tool_name=tool_call.name, ) return ToolExecutionResult( - content=("Output scan failed (fail-closed). Tool output withheld."), + content="Output scan failed (fail-closed). Tool output withheld.", is_error=True, - metadata={"output_scan_failed": True}, + metadata={**result.metadata, "output_scan_failed": True}, ) if scan_result.has_sensitive_data: - if scan_result.redacted_content is not None: - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - ) - return ToolExecutionResult( - content=scan_result.redacted_content, - is_error=result.is_error, - metadata={ - **result.metadata, - "output_redacted": True, - "redaction_findings": list(scan_result.findings), - }, - ) - # Sensitive data found but no redaction available — fail closed. - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - note="no redacted content available — withholding output", - ) - return ToolExecutionResult( - content=( - "Sensitive data detected (fail-closed). Tool output withheld." - ), - is_error=True, - metadata={"output_scan_failed": True}, - ) + return handle_sensitive_scan(tool_call, result, scan_result) return result async def invoke(self, tool_call: ToolCall) -> ToolResult: diff --git a/src/ai_company/tools/scan_result_handler.py b/src/ai_company/tools/scan_result_handler.py new file mode 100644 index 0000000000..4911284802 --- /dev/null +++ b/src/ai_company/tools/scan_result_handler.py @@ -0,0 +1,90 @@ +"""Output scan result handler — routes sensitive scan results. + +Standalone function extracted from ``ToolInvoker`` to keep +``invoker.py`` under the 800-line file limit. +""" + +from typing import TYPE_CHECKING + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import ( + TOOL_OUTPUT_REDACTED, + TOOL_OUTPUT_WITHHELD, +) +from ai_company.security.models import OutputScanResult, ScanOutcome + +from .base import ToolExecutionResult + +if TYPE_CHECKING: + from ai_company.providers.models import ToolCall + +logger = get_logger(__name__) + + +def handle_sensitive_scan( + tool_call: ToolCall, + result: ToolExecutionResult, + scan_result: OutputScanResult, +) -> ToolExecutionResult: + """Route a sensitive scan result to the correct handler. + + Branches on ``ScanOutcome``: + + - ``WITHHELD``: return error with "withheld by policy" message. + - ``redacted_content`` present: return redacted content. + - Defensive fallback: withhold output (fail-closed). + + Args: + tool_call: The tool call being processed. + result: The original tool execution result. + scan_result: The scan result with ``has_sensitive_data=True``. + + Returns: + A new ``ToolExecutionResult`` reflecting the scan outcome. + """ + if scan_result.outcome == ScanOutcome.WITHHELD: + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="content withheld by security policy", + ) + return ToolExecutionResult( + content=("Sensitive data detected — content withheld by security policy."), + is_error=True, + metadata={**result.metadata, "output_withheld": True}, + ) + if scan_result.redacted_content is not None: + logger.warning( + TOOL_OUTPUT_REDACTED, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + ) + return ToolExecutionResult( + content=scan_result.redacted_content, + is_error=result.is_error, + metadata={ + **result.metadata, + "output_redacted": True, + "redaction_findings": list(scan_result.findings), + }, + ) + # Defensive: model_copy() skips model validators, so a policy + # that clears redacted_content without updating outcome could + # produce REDACTED with redacted_content=None. This branch + # catches that case (and future outcome values) — fail-closed. + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + outcome=scan_result.outcome.value, + note="no redacted content available — withholding output", + ) + return ToolExecutionResult( + content="Sensitive data detected (fail-closed). Tool output withheld.", + is_error=True, + metadata={**result.metadata, "output_scan_failed": True}, + ) diff --git a/tests/unit/security/test_models.py b/tests/unit/security/test_models.py index b8cabe1250..992d5fc3ff 100644 --- a/tests/unit/security/test_models.py +++ b/tests/unit/security/test_models.py @@ -9,6 +9,7 @@ from ai_company.security.models import ( AuditEntry, OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -459,18 +460,21 @@ def test_defaults(self) -> None: assert result.has_sensitive_data is False assert result.findings == () assert result.redacted_content is None + assert result.outcome == ScanOutcome.CLEAN def test_with_findings(self) -> None: result = OutputScanResult( has_sensitive_data=True, findings=("API key detected", "Email address found"), redacted_content="content with [REDACTED]", + outcome=ScanOutcome.REDACTED, ) assert result.has_sensitive_data is True assert len(result.findings) == 2 assert result.findings[0] == "API key detected" assert result.findings[1] == "Email address found" assert result.redacted_content == "content with [REDACTED]" + assert result.outcome == ScanOutcome.REDACTED def test_frozen(self) -> None: result = OutputScanResult() @@ -512,7 +516,108 @@ def test_json_roundtrip(self) -> None: has_sensitive_data=True, findings=("PII detected",), redacted_content="safe output", + outcome=ScanOutcome.REDACTED, ) json_str = result.model_dump_json() restored = OutputScanResult.model_validate_json(json_str) assert restored == result + + @pytest.mark.parametrize( + ("kwargs", "match"), + [ + pytest.param( + { + "has_sensitive_data": True, + "findings": ("leak",), + "outcome": ScanOutcome.CLEAN, + }, + "outcome", + id="clean-rejected-when-sensitive", + ), + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.REDACTED}, + "outcome", + id="redacted-rejected-when-not-sensitive", + ), + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.WITHHELD}, + "outcome", + id="withheld-rejected-when-not-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("leak",), + "outcome": ScanOutcome.REDACTED, + "redacted_content": None, + }, + "redacted_content", + id="redacted-requires-redacted-content", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.LOG_ONLY, + }, + "outcome", + id="log-only-rejected-when-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": (), + "outcome": ScanOutcome.WITHHELD, + }, + "findings", + id="empty-findings-rejected-when-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.WITHHELD, + "redacted_content": "should not be set", + }, + "redacted_content", + id="withheld-rejects-non-none-redacted-content", + ), + ], + ) + def test_outcome_validation_rejects_invalid( + self, + kwargs: dict[str, object], + match: str, + ) -> None: + """Parametrized: invalid outcome/field combinations are rejected.""" + with pytest.raises(ValidationError, match=match): + OutputScanResult(**kwargs) # type: ignore[arg-type] + + @pytest.mark.parametrize( + ("kwargs", "expected_outcome"), + [ + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.LOG_ONLY}, + ScanOutcome.LOG_ONLY, + id="log-only-accepted-when-not-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.WITHHELD, + "redacted_content": None, + }, + ScanOutcome.WITHHELD, + id="withheld-valid-when-sensitive", + ), + ], + ) + def test_outcome_validation_accepts_valid( + self, + kwargs: dict[str, object], + expected_outcome: ScanOutcome, + ) -> None: + """Parametrized: valid outcome/field combinations are accepted.""" + result = OutputScanResult(**kwargs) # type: ignore[arg-type] + assert result.outcome == expected_outcome diff --git a/tests/unit/security/test_output_scan_policy.py b/tests/unit/security/test_output_scan_policy.py index 2341d4611e..932f1cb05b 100644 --- a/tests/unit/security/test_output_scan_policy.py +++ b/tests/unit/security/test_output_scan_policy.py @@ -4,7 +4,7 @@ from ai_company.core.enums import AutonomyLevel, ToolCategory from ai_company.security.autonomy.models import EffectiveAutonomy -from ai_company.security.models import OutputScanResult, SecurityContext +from ai_company.security.models import OutputScanResult, ScanOutcome, SecurityContext from ai_company.security.output_scan_policy import ( _DEFAULT_AUTONOMY_POLICY_MAP, AutonomyTieredPolicy, @@ -36,6 +36,7 @@ def _sensitive_result() -> OutputScanResult: has_sensitive_data=True, findings=("API key detected",), redacted_content="output with [REDACTED]", + outcome=ScanOutcome.REDACTED, ) @@ -58,6 +59,7 @@ def test_sensitive_result_passes_through(self) -> None: assert transformed == result assert transformed.redacted_content == "output with [REDACTED]" + assert transformed.outcome == ScanOutcome.REDACTED def test_clean_result_passes_through(self) -> None: policy = RedactPolicy() @@ -84,6 +86,7 @@ def test_sensitive_result_clears_redacted_content(self) -> None: assert transformed.has_sensitive_data is True assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.WITHHELD # Original result is not mutated (immutability contract). assert result.redacted_content == "output with [REDACTED]" @@ -120,6 +123,7 @@ def test_sensitive_result_returns_empty(self) -> None: assert transformed.has_sensitive_data is False assert transformed.findings == () assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.LOG_ONLY def test_clean_result_returns_empty(self) -> None: policy = LogOnlyPolicy() @@ -128,6 +132,7 @@ def test_clean_result_returns_empty(self) -> None: transformed = policy.apply(result, _make_context()) assert transformed == OutputScanResult() + assert transformed.outcome == ScanOutcome.CLEAN # ── TestAutonomyTieredPolicy ───────────────────────────────────── @@ -154,8 +159,9 @@ def test_full_autonomy_uses_log_only(self) -> None: transformed = policy.apply(result, _make_context()) - # LogOnlyPolicy returns empty result. + # LogOnlyPolicy returns empty result with LOG_ONLY outcome. assert transformed.has_sensitive_data is False + assert transformed.outcome == ScanOutcome.LOG_ONLY def test_semi_autonomy_uses_redact(self) -> None: """SEMI level delegates to RedactPolicy (default map).""" @@ -179,6 +185,7 @@ def test_locked_autonomy_uses_withhold(self) -> None: # WithholdPolicy clears redacted_content. assert transformed.has_sensitive_data is True assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.WITHHELD def test_no_autonomy_falls_back_to_redact(self) -> None: """When effective_autonomy is None, falls back to RedactPolicy.""" diff --git a/tests/unit/security/test_output_scanner.py b/tests/unit/security/test_output_scanner.py index ca4014d59a..3781b43b7f 100644 --- a/tests/unit/security/test_output_scanner.py +++ b/tests/unit/security/test_output_scanner.py @@ -2,6 +2,7 @@ import pytest +from ai_company.security.models import ScanOutcome from ai_company.security.output_scanner import OutputScanner pytestmark = pytest.mark.timeout(30) @@ -27,6 +28,7 @@ def test_clean_text_no_findings(self) -> None: assert result.has_sensitive_data is False assert result.findings == () assert result.redacted_content is None + assert result.outcome == ScanOutcome.CLEAN def test_empty_string_no_findings(self) -> None: result = _scanner().scan("") @@ -73,6 +75,7 @@ def test_credential_detected(self, label: str, text: str) -> None: assert result.has_sensitive_data is True assert len(result.findings) >= 1 + assert result.outcome == ScanOutcome.REDACTED def test_aws_key_in_findings(self) -> None: result = _scanner().scan("AKIAIOSFODNN7EXAMPLE is the key") diff --git a/tests/unit/security/test_service.py b/tests/unit/security/test_service.py index 5cab6ade43..0f72d031db 100644 --- a/tests/unit/security/test_service.py +++ b/tests/unit/security/test_service.py @@ -17,6 +17,7 @@ from ai_company.security.config import SecurityConfig from ai_company.security.models import ( OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -343,6 +344,7 @@ async def test_scan_delegates_to_scanner(self) -> None: has_sensitive_data=True, findings=("provider access key",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = _make_service(scan_result=finding_result) ctx = _make_context() @@ -366,6 +368,7 @@ async def test_scan_records_audit_on_findings(self) -> None: has_sensitive_data=True, findings=("Bearer token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = _make_service(scan_result=finding_result) ctx = _make_context() @@ -641,6 +644,7 @@ async def test_scan_audit_failure_still_returns_result(self) -> None: has_sensitive_data=True, findings=("API key detected",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_failing_audit(finding) ctx = _make_context() @@ -656,6 +660,7 @@ async def test_scan_audit_failure_does_not_propagate(self) -> None: has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_failing_audit(finding) ctx = _make_context() @@ -702,6 +707,7 @@ async def test_policy_applied_after_scan(self) -> None: has_sensitive_data=True, findings=("token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -722,6 +728,7 @@ async def test_policy_transforms_result(self) -> None: has_sensitive_data=True, findings=("key",), redacted_content="redacted output", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -759,6 +766,7 @@ async def test_default_policy_from_config_passes_through(self) -> None: has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -780,6 +788,7 @@ async def test_policy_failure_returns_raw_scan_result(self) -> None: has_sensitive_data=True, findings=("token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -812,6 +821,7 @@ async def test_non_recoverable_policy_errors_propagate( has_sensitive_data=True, findings=("key",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -830,6 +840,7 @@ async def test_audit_preserves_findings_before_policy_clears_them( has_sensitive_data=True, findings=("Bearer token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, diff --git a/tests/unit/tools/test_invoker_output_scan.py b/tests/unit/tools/test_invoker_output_scan.py new file mode 100644 index 0000000000..db1fccc234 --- /dev/null +++ b/tests/unit/tools/test_invoker_output_scan.py @@ -0,0 +1,315 @@ +"""Tests for ScanOutcome-driven output scan handling in ToolInvoker. + +Covers the WITHHELD, LOG_ONLY, and defensive fallback branches of +``handle_sensitive_scan`` (exercised via the ``ToolInvoker`` flow). +Split from ``test_invoker_security.py`` to keep file sizes under +800 lines. +""" + +from datetime import UTC, datetime +from typing import Any +from unittest.mock import AsyncMock + +import pytest + +from ai_company.core.enums import ApprovalRiskLevel, ToolCategory +from ai_company.providers.models import ToolCall +from ai_company.security.models import ( + OutputScanResult, + ScanOutcome, + SecurityContext, + SecurityVerdict, + SecurityVerdictType, +) +from ai_company.tools.base import BaseTool, ToolExecutionResult +from ai_company.tools.invoker import ToolInvoker +from ai_company.tools.registry import ToolRegistry + +pytestmark = pytest.mark.timeout(30) + +_NOW = datetime.now(UTC) + + +# ── Concrete test tool ─────────────────────────────────────────── + + +class _OutputScanTestTool(BaseTool): + """Simple tool for output scan integration tests.""" + + def __init__(self) -> None: + super().__init__( + name="secure_tool", + description="Test tool: secure_tool", + category=ToolCategory.FILE_SYSTEM, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + return ToolExecutionResult( + content=f"executed: {arguments.get('cmd', 'default')}", + ) + + +# ── Helpers ────────────────────────────────────────────────────── + + +def _make_verdict( + *, + verdict: SecurityVerdictType = SecurityVerdictType.ALLOW, + reason: str = "test reason", + risk_level: ApprovalRiskLevel = ApprovalRiskLevel.LOW, + approval_id: str | None = None, +) -> SecurityVerdict: + """Build a SecurityVerdict with sensible defaults.""" + return SecurityVerdict( + verdict=verdict, + reason=reason, + risk_level=risk_level, + evaluated_at=_NOW, + evaluation_duration_ms=1.0, + approval_id=approval_id, + ) + + +def _make_interceptor( + *, + pre_tool_verdict: SecurityVerdict | None = None, + scan_result: OutputScanResult | None = None, +) -> AsyncMock: + """Build a mock SecurityInterceptionStrategy.""" + interceptor = AsyncMock() + interceptor.evaluate_pre_tool = AsyncMock( + return_value=pre_tool_verdict or _make_verdict(), + ) + interceptor.scan_output = AsyncMock( + return_value=scan_result or OutputScanResult(), + ) + return interceptor + + +# ── Fixtures ───────────────────────────────────────────────────── + + +@pytest.fixture +def security_registry() -> ToolRegistry: + return ToolRegistry([_OutputScanTestTool()]) + + +@pytest.fixture +def tool_call() -> ToolCall: + return ToolCall( + id="call_scan_001", + name="secure_tool", + arguments={"cmd": "ls"}, + ) + + +# ── Withheld outcome tests ───────────────────────────────────── + + +@pytest.mark.unit +class TestWithheldOutcome: + """Tests for the WITHHELD scan outcome path in the invoker.""" + + async def test_withheld_outcome_returns_policy_message( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome returns explicit policy message.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("secret token",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + + async def test_withheld_metadata_uses_output_withheld_key( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome sets output_withheld metadata, not output_scan_failed.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("credential",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + # Access _scan_output to inspect ToolExecutionResult.metadata + # (ToolResult does not surface the metadata field from the + # execution layer, so the public invoke() API cannot verify + # metadata keys). + tool_exec_result = ToolExecutionResult( + content="raw output", + metadata={"sentinel": True}, + ) + context = SecurityContext( + tool_name="secure_tool", + tool_category=ToolCategory.FILE_SYSTEM, + action_type="code:write", + ) + scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) + assert scan_exec.metadata.get("output_withheld") is True + assert scan_exec.metadata.get("sentinel") is True + assert "output_scan_failed" not in scan_exec.metadata + + async def test_withheld_takes_priority_over_redacted_content( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """WITHHELD outcome withholds even when redacted_content is present. + + The model validator now rejects WITHHELD + non-None redacted_content + at construction time. This test uses ``model_copy`` (which skips + validators) to verify the invoker's defence-in-depth branching: + WITHHELD is checked before redacted_content. + """ + # Build a valid WITHHELD result, then sneak in redacted_content + # via model_copy (which bypasses the model validator). + base = OutputScanResult( + has_sensitive_data=True, + findings=("token",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ) + broken = base.model_copy( + update={"redacted_content": "partially redacted output"}, + ) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + assert "partially redacted" not in result.content + + +# ── LOG_ONLY outcome tests ────────────────────────────────────── + + +@pytest.mark.unit +class TestLogOnlyOutcome: + """Tests for the LOG_ONLY scan outcome path in the invoker.""" + + async def test_log_only_passes_original_output_through( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """LOG_ONLY outcome passes original tool output through unchanged.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult(outcome=ScanOutcome.LOG_ONLY), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is False + assert result.content == "executed: ls" + + +# ── Defensive fallback tests ──────────────────────────────────── + + +@pytest.mark.unit +class TestDefensiveFallback: + """Tests for the defensive fail-closed fallback in handle_sensitive_scan. + + Exercised via ``ToolInvoker._scan_output``. This branch catches + unexpected states where ``has_sensitive_data=True`` but outcome is + not ``WITHHELD`` and ``redacted_content`` is ``None``. Reachable + when ``model_copy()`` skips validators. + """ + + async def test_defensive_fallback_withholds_on_unexpected_state( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Defensive fallback withholds when scan has sensitive data + but neither WITHHELD outcome nor redacted_content.""" + # Simulate a broken policy via model_copy (skips validators): + # REDACTED outcome but redacted_content cleared. + base = OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + redacted_content="safe", + outcome=ScanOutcome.REDACTED, + ) + broken = base.model_copy(update={"redacted_content": None}) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "fail-closed" in result.content.lower() + assert "executed:" not in result.content + + async def test_defensive_fallback_metadata_uses_scan_failed_key( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Defensive fallback sets output_scan_failed metadata.""" + base = OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + redacted_content="safe", + outcome=ScanOutcome.REDACTED, + ) + broken = base.model_copy(update={"redacted_content": None}) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + tool_exec_result = ToolExecutionResult( + content="raw output", + metadata={"sentinel": True}, + ) + context = SecurityContext( + tool_name="secure_tool", + tool_category=ToolCategory.FILE_SYSTEM, + action_type="code:write", + ) + scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) + assert scan_exec.metadata.get("output_scan_failed") is True + assert scan_exec.metadata.get("sentinel") is True + assert "output_withheld" not in scan_exec.metadata diff --git a/tests/unit/tools/test_invoker_security.py b/tests/unit/tools/test_invoker_security.py index db8bfec0e9..a38e024350 100644 --- a/tests/unit/tools/test_invoker_security.py +++ b/tests/unit/tools/test_invoker_security.py @@ -10,6 +10,7 @@ from ai_company.providers.models import ToolCall from ai_company.security.models import ( OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -51,6 +52,38 @@ async def execute( ) +class _FailingSecurityTool(_SecurityTestTool): + """Tool that raises RuntimeError from execute.""" + + def __init__(self) -> None: + super().__init__(name="failing_tool") + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + msg = "intentional failure" + raise RuntimeError(msg) + + +class _SoftErrorSecurityTool(_SecurityTestTool): + """Tool that returns is_error=True with sensitive content.""" + + def __init__(self) -> None: + super().__init__(name="soft_error_tool") + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + return ToolExecutionResult( + is_error=True, + content="error: API_KEY=AKIA1234567890EXAMPLE", + ) + + # ── Helpers ────────────────────────────────────────────────────── _NOW = datetime.now(UTC) @@ -333,6 +366,7 @@ async def test_sensitive_output_is_redacted( has_sensitive_data=True, findings=("API key detected",), redacted_content="executed: [REDACTED]", + outcome=ScanOutcome.REDACTED, ), ) invoker = ToolInvoker( @@ -377,18 +411,19 @@ async def test_scan_output_called_after_successful_execution( await invoker.invoke(tool_call) interceptor.scan_output.assert_awaited_once() - async def test_sensitive_but_no_redacted_content_fails_closed( + async def test_withheld_outcome_returns_policy_message( self, security_registry: ToolRegistry, tool_call: ToolCall, ) -> None: - """If has_sensitive_data=True but redacted_content is None, fail-closed.""" + """WITHHELD outcome returns explicit policy message (not fail-closed).""" interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), scan_result=OutputScanResult( has_sensitive_data=True, findings=("potential leak",), redacted_content=None, + outcome=ScanOutcome.WITHHELD, ), ) invoker = ToolInvoker( @@ -397,7 +432,7 @@ async def test_sensitive_but_no_redacted_content_fails_closed( ) result = await invoker.invoke(tool_call) assert result.is_error is True - assert "fail-closed" in result.content.lower() + assert "withheld by security policy" in result.content.lower() assert "executed:" not in result.content @@ -571,49 +606,6 @@ async def test_scan_output_context_matches_pre_tool_context( assert pre_ctx.task_id == scan_ctx.task_id -# ── Helper tools for gap tests ─────────────────────────────────── - - -class _SecurityFailingTool(BaseTool): - """Tool that raises RuntimeError from execute.""" - - def __init__(self) -> None: - super().__init__( - name="failing_tool", - description="Tool that always fails", - category=ToolCategory.FILE_SYSTEM, - ) - - async def execute( - self, - *, - arguments: dict[str, Any], - ) -> ToolExecutionResult: - msg = "intentional failure" - raise RuntimeError(msg) - - -class _SecuritySoftErrorTool(BaseTool): - """Tool that returns is_error=True with sensitive content.""" - - def __init__(self) -> None: - super().__init__( - name="soft_error_tool", - description="Tool returning soft error", - category=ToolCategory.FILE_SYSTEM, - ) - - async def execute( - self, - *, - arguments: dict[str, Any], - ) -> ToolExecutionResult: - return ToolExecutionResult( - is_error=True, - content="error: API_KEY=AKIA1234567890EXAMPLE", - ) - - # ── Gap 1: Non-recoverable errors from scan propagate ──────────── @@ -656,8 +648,7 @@ class TestOutputScanSkippedOnToolError: """When tool.execute() raises, scan_output is not called.""" async def test_tool_execution_error_skips_output_scan(self) -> None: - failing_tool = _SecurityFailingTool() - registry = ToolRegistry([failing_tool]) + registry = ToolRegistry([_FailingSecurityTool()]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), ) @@ -740,6 +731,7 @@ async def test_invoke_all_with_redaction( has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ), ) invoker = ToolInvoker( @@ -766,12 +758,12 @@ class TestOutputScanOnSoftError: async def test_soft_error_content_is_scanned(self) -> None: """When tool returns is_error=True, scan_output is still called.""" - soft_tool = _SecuritySoftErrorTool() - registry = ToolRegistry([soft_tool]) + registry = ToolRegistry([_SoftErrorSecurityTool()]) scan_result = OutputScanResult( has_sensitive_data=True, findings=("API key",), redacted_content="error: [REDACTED]", + outcome=ScanOutcome.REDACTED, ) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), @@ -791,8 +783,7 @@ async def test_soft_error_content_is_scanned(self) -> None: async def test_soft_error_scan_receives_error_content(self) -> None: """Verify scan_output receives the error content string.""" - soft_tool = _SecuritySoftErrorTool() - registry = ToolRegistry([soft_tool]) + registry = ToolRegistry([_SoftErrorSecurityTool()]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), )