From 652f31cb382250f79443c37fbb8fa13029444cb1 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 14:53:10 +0100 Subject: [PATCH 1/6] feat: add explicit ScanOutcome signal to OutputScanResult (#284) Add a ScanOutcome enum (CLEAN/REDACTED/WITHHELD/LOG_ONLY) to OutputScanResult so the ToolInvoker can distinguish intentional policy withholding from scanner failure. Previously both produced has_sensitive_data=True + redacted_content=None, leading to a misleading "no redacted content available" log message. - ScanOutcome enum in security/models.py with consistency validation - OutputScanner sets REDACTED, WithholdPolicy sets WITHHELD, LogOnlyPolicy sets LOG_ONLY on sensitive results - ToolInvoker branches on WITHHELD first with dedicated event (TOOL_OUTPUT_WITHHELD) and clear user-facing message - 5 new model validation tests, 2 new invoker tests, outcome assertions added across all affected test files --- src/ai_company/observability/events/tool.py | 1 + src/ai_company/security/__init__.py | 2 + src/ai_company/security/models.py | 33 +++++++++ src/ai_company/security/output_scan_policy.py | 18 ++--- src/ai_company/security/output_scanner.py | 3 +- src/ai_company/tools/invoker.py | 21 +++++- tests/unit/security/test_models.py | 49 +++++++++++++ .../unit/security/test_output_scan_policy.py | 11 ++- tests/unit/security/test_output_scanner.py | 3 + tests/unit/security/test_service.py | 11 +++ tests/unit/tools/test_invoker_security.py | 72 ++++++++++++++++++- 11 files changed, 210 insertions(+), 14 deletions(-) diff --git a/src/ai_company/observability/events/tool.py b/src/ai_company/observability/events/tool.py index e5082281d0..aa980ce30c 100644 --- a/src/ai_company/observability/events/tool.py +++ b/src/ai_company/observability/events/tool.py @@ -44,6 +44,7 @@ TOOL_SECURITY_DENIED: Final[str] = "tool.security.denied" TOOL_SECURITY_ESCALATED: Final[str] = "tool.security.escalated" TOOL_OUTPUT_REDACTED: Final[str] = "tool.output.redacted" +TOOL_OUTPUT_WITHHELD: Final[str] = "tool.output.withheld" # ── Subprocess utility events ─────────────────────────────────── TOOL_SUBPROCESS_TRANSPORT_CLOSE_FAILED: Final[str] = ( diff --git a/src/ai_company/security/__init__.py b/src/ai_company/security/__init__.py index df3ce3e383..a55b88a773 100644 --- a/src/ai_company/security/__init__.py +++ b/src/ai_company/security/__init__.py @@ -32,6 +32,7 @@ from ai_company.security.models import ( AuditEntry, OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -66,6 +67,7 @@ "RedactPolicy", "RuleEngine", "RuleEngineConfig", + "ScanOutcome", "SecOpsService", "SecurityConfig", "SecurityContext", diff --git a/src/ai_company/security/models.py b/src/ai_company/security/models.py index 79b914a05a..2af0ddf08e 100644 --- a/src/ai_company/security/models.py +++ b/src/ai_company/security/models.py @@ -20,6 +20,26 @@ from ai_company.core.types import NotBlankStr # noqa: TC001 +class ScanOutcome(StrEnum): + """Outcome of an output scan policy decision. + + Tracks what the scanner/policy *did* with the output so that + downstream consumers (e.g. ``ToolInvoker``) can distinguish + intentional withholding from scanner failure. + + Members: + CLEAN: No sensitive data detected (default). + REDACTED: Sensitive data found, redacted content available. + WITHHELD: Content intentionally withheld by policy. + LOG_ONLY: Findings logged, original content passed through. + """ + + CLEAN = "clean" + REDACTED = "redacted" + WITHHELD = "withheld" + LOG_ONLY = "log_only" + + class SecurityVerdictType(StrEnum): """Security verdict constants. @@ -156,6 +176,9 @@ class OutputScanResult(BaseModel): has_sensitive_data: Whether sensitive data was detected. findings: Descriptions of findings. redacted_content: Content with sensitive data replaced, or None. + outcome: What the scanner/policy did with the output. + Allows downstream consumers to distinguish intentional + withholding from scanner failure. """ model_config = ConfigDict(frozen=True) @@ -163,6 +186,7 @@ class OutputScanResult(BaseModel): has_sensitive_data: bool = False findings: tuple[NotBlankStr, ...] = () redacted_content: str | None = None + outcome: ScanOutcome = ScanOutcome.CLEAN @model_validator(mode="after") def _check_consistency(self) -> OutputScanResult: @@ -174,4 +198,13 @@ def _check_consistency(self) -> OutputScanResult: if self.redacted_content is not None: msg = "redacted_content must be None when has_sensitive_data is False" raise ValueError(msg) + if self.outcome in (ScanOutcome.REDACTED, ScanOutcome.WITHHELD): + msg = ( + f"outcome={self.outcome.value!r} is invalid when " + "has_sensitive_data is False" + ) + raise ValueError(msg) + elif self.outcome == ScanOutcome.CLEAN: + msg = "outcome='clean' is invalid when has_sensitive_data is True" + raise ValueError(msg) return self diff --git a/src/ai_company/security/output_scan_policy.py b/src/ai_company/security/output_scan_policy.py index 11e3baca2e..3b1c169b23 100644 --- a/src/ai_company/security/output_scan_policy.py +++ b/src/ai_company/security/output_scan_policy.py @@ -14,7 +14,7 @@ from ai_company.observability.events.security import ( SECURITY_OUTPUT_SCAN_POLICY_APPLIED, ) -from ai_company.security.models import OutputScanResult +from ai_company.security.models import OutputScanResult, ScanOutcome if TYPE_CHECKING: from collections.abc import Mapping @@ -127,7 +127,9 @@ def apply( ) if not scan_result.has_sensitive_data: return scan_result - return scan_result.model_copy(update={"redacted_content": None}) + return scan_result.model_copy( + update={"redacted_content": None, "outcome": ScanOutcome.WITHHELD}, + ) class LogOnlyPolicy: @@ -172,12 +174,12 @@ def apply( agent_id=context.agent_id, note="Sensitive data detected but passed through by log_only policy", ) - else: - logger.debug( - SECURITY_OUTPUT_SCAN_POLICY_APPLIED, - policy="log_only", - has_sensitive_data=False, - ) + return OutputScanResult(outcome=ScanOutcome.LOG_ONLY) + logger.debug( + SECURITY_OUTPUT_SCAN_POLICY_APPLIED, + policy="log_only", + has_sensitive_data=False, + ) return OutputScanResult() diff --git a/src/ai_company/security/output_scanner.py b/src/ai_company/security/output_scanner.py index 0cf4797bcf..64523cf841 100644 --- a/src/ai_company/security/output_scanner.py +++ b/src/ai_company/security/output_scanner.py @@ -13,7 +13,7 @@ SECURITY_OUTPUT_SCAN_FINDING, SECURITY_OUTPUT_SCAN_START, ) -from ai_company.security.models import OutputScanResult +from ai_company.security.models import OutputScanResult, ScanOutcome from ai_company.security.rules.credential_detector import CREDENTIAL_PATTERNS from ai_company.security.rules.data_leak_detector import PII_PATTERNS @@ -67,4 +67,5 @@ def scan(self, output: str) -> OutputScanResult: has_sensitive_data=True, findings=tuple(sorted(set(findings))), redacted_content=redacted, + outcome=ScanOutcome.REDACTED, ) diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index e9ab6b27b0..451b47d2fb 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -36,12 +36,13 @@ TOOL_INVOKE_TOOL_ERROR, TOOL_INVOKE_VALIDATION_UNEXPECTED, TOOL_OUTPUT_REDACTED, + TOOL_OUTPUT_WITHHELD, TOOL_PERMISSION_DENIED, TOOL_SECURITY_DENIED, TOOL_SECURITY_ESCALATED, ) from ai_company.providers.models import ToolCall, ToolResult -from ai_company.security.models import SecurityContext, SecurityVerdictType +from ai_company.security.models import ScanOutcome, SecurityContext, SecurityVerdictType from .base import ToolExecutionResult from .errors import ToolExecutionError, ToolNotFoundError, ToolParameterError @@ -307,6 +308,21 @@ async def _scan_output( ) if scan_result.has_sensitive_data: + if scan_result.outcome == ScanOutcome.WITHHELD: + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="content withheld by security policy", + ) + return ToolExecutionResult( + content=( + "Sensitive data detected — content withheld by security policy." + ), + is_error=True, + metadata={"output_withheld": True}, + ) if scan_result.redacted_content is not None: logger.warning( TOOL_OUTPUT_REDACTED, @@ -323,7 +339,8 @@ async def _scan_output( "redaction_findings": list(scan_result.findings), }, ) - # Sensitive data found but no redaction available — fail closed. + # Defensive: REDACTED outcome but redacted_content is None + # (shouldn't happen). logger.warning( TOOL_OUTPUT_REDACTED, tool_call_id=tool_call.id, diff --git a/tests/unit/security/test_models.py b/tests/unit/security/test_models.py index b8cabe1250..98fd35d038 100644 --- a/tests/unit/security/test_models.py +++ b/tests/unit/security/test_models.py @@ -9,6 +9,7 @@ from ai_company.security.models import ( AuditEntry, OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -459,18 +460,21 @@ def test_defaults(self) -> None: assert result.has_sensitive_data is False assert result.findings == () assert result.redacted_content is None + assert result.outcome == ScanOutcome.CLEAN def test_with_findings(self) -> None: result = OutputScanResult( has_sensitive_data=True, findings=("API key detected", "Email address found"), redacted_content="content with [REDACTED]", + outcome=ScanOutcome.REDACTED, ) assert result.has_sensitive_data is True assert len(result.findings) == 2 assert result.findings[0] == "API key detected" assert result.findings[1] == "Email address found" assert result.redacted_content == "content with [REDACTED]" + assert result.outcome == ScanOutcome.REDACTED def test_frozen(self) -> None: result = OutputScanResult() @@ -512,7 +516,52 @@ def test_json_roundtrip(self) -> None: has_sensitive_data=True, findings=("PII detected",), redacted_content="safe output", + outcome=ScanOutcome.REDACTED, ) json_str = result.model_dump_json() restored = OutputScanResult.model_validate_json(json_str) assert restored == result + + def test_outcome_clean_rejected_when_sensitive(self) -> None: + """outcome=CLEAN is invalid when has_sensitive_data=True.""" + with pytest.raises(ValidationError, match="outcome"): + OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + outcome=ScanOutcome.CLEAN, + ) + + def test_outcome_redacted_rejected_when_not_sensitive(self) -> None: + """outcome=REDACTED is invalid when has_sensitive_data=False.""" + with pytest.raises(ValidationError, match="outcome"): + OutputScanResult( + has_sensitive_data=False, + outcome=ScanOutcome.REDACTED, + ) + + def test_outcome_withheld_rejected_when_not_sensitive(self) -> None: + """outcome=WITHHELD is invalid when has_sensitive_data=False.""" + with pytest.raises(ValidationError, match="outcome"): + OutputScanResult( + has_sensitive_data=False, + outcome=ScanOutcome.WITHHELD, + ) + + def test_outcome_log_only_accepted_when_not_sensitive(self) -> None: + """outcome=LOG_ONLY is valid with has_sensitive_data=False.""" + result = OutputScanResult( + has_sensitive_data=False, + outcome=ScanOutcome.LOG_ONLY, + ) + assert result.outcome == ScanOutcome.LOG_ONLY + + def test_outcome_withheld_valid(self) -> None: + """outcome=WITHHELD is valid with has_sensitive_data=True.""" + result = OutputScanResult( + has_sensitive_data=True, + findings=("secret",), + outcome=ScanOutcome.WITHHELD, + redacted_content=None, + ) + assert result.outcome == ScanOutcome.WITHHELD + assert result.redacted_content is None diff --git a/tests/unit/security/test_output_scan_policy.py b/tests/unit/security/test_output_scan_policy.py index 2341d4611e..932f1cb05b 100644 --- a/tests/unit/security/test_output_scan_policy.py +++ b/tests/unit/security/test_output_scan_policy.py @@ -4,7 +4,7 @@ from ai_company.core.enums import AutonomyLevel, ToolCategory from ai_company.security.autonomy.models import EffectiveAutonomy -from ai_company.security.models import OutputScanResult, SecurityContext +from ai_company.security.models import OutputScanResult, ScanOutcome, SecurityContext from ai_company.security.output_scan_policy import ( _DEFAULT_AUTONOMY_POLICY_MAP, AutonomyTieredPolicy, @@ -36,6 +36,7 @@ def _sensitive_result() -> OutputScanResult: has_sensitive_data=True, findings=("API key detected",), redacted_content="output with [REDACTED]", + outcome=ScanOutcome.REDACTED, ) @@ -58,6 +59,7 @@ def test_sensitive_result_passes_through(self) -> None: assert transformed == result assert transformed.redacted_content == "output with [REDACTED]" + assert transformed.outcome == ScanOutcome.REDACTED def test_clean_result_passes_through(self) -> None: policy = RedactPolicy() @@ -84,6 +86,7 @@ def test_sensitive_result_clears_redacted_content(self) -> None: assert transformed.has_sensitive_data is True assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.WITHHELD # Original result is not mutated (immutability contract). assert result.redacted_content == "output with [REDACTED]" @@ -120,6 +123,7 @@ def test_sensitive_result_returns_empty(self) -> None: assert transformed.has_sensitive_data is False assert transformed.findings == () assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.LOG_ONLY def test_clean_result_returns_empty(self) -> None: policy = LogOnlyPolicy() @@ -128,6 +132,7 @@ def test_clean_result_returns_empty(self) -> None: transformed = policy.apply(result, _make_context()) assert transformed == OutputScanResult() + assert transformed.outcome == ScanOutcome.CLEAN # ── TestAutonomyTieredPolicy ───────────────────────────────────── @@ -154,8 +159,9 @@ def test_full_autonomy_uses_log_only(self) -> None: transformed = policy.apply(result, _make_context()) - # LogOnlyPolicy returns empty result. + # LogOnlyPolicy returns empty result with LOG_ONLY outcome. assert transformed.has_sensitive_data is False + assert transformed.outcome == ScanOutcome.LOG_ONLY def test_semi_autonomy_uses_redact(self) -> None: """SEMI level delegates to RedactPolicy (default map).""" @@ -179,6 +185,7 @@ def test_locked_autonomy_uses_withhold(self) -> None: # WithholdPolicy clears redacted_content. assert transformed.has_sensitive_data is True assert transformed.redacted_content is None + assert transformed.outcome == ScanOutcome.WITHHELD def test_no_autonomy_falls_back_to_redact(self) -> None: """When effective_autonomy is None, falls back to RedactPolicy.""" diff --git a/tests/unit/security/test_output_scanner.py b/tests/unit/security/test_output_scanner.py index ca4014d59a..3781b43b7f 100644 --- a/tests/unit/security/test_output_scanner.py +++ b/tests/unit/security/test_output_scanner.py @@ -2,6 +2,7 @@ import pytest +from ai_company.security.models import ScanOutcome from ai_company.security.output_scanner import OutputScanner pytestmark = pytest.mark.timeout(30) @@ -27,6 +28,7 @@ def test_clean_text_no_findings(self) -> None: assert result.has_sensitive_data is False assert result.findings == () assert result.redacted_content is None + assert result.outcome == ScanOutcome.CLEAN def test_empty_string_no_findings(self) -> None: result = _scanner().scan("") @@ -73,6 +75,7 @@ def test_credential_detected(self, label: str, text: str) -> None: assert result.has_sensitive_data is True assert len(result.findings) >= 1 + assert result.outcome == ScanOutcome.REDACTED def test_aws_key_in_findings(self) -> None: result = _scanner().scan("AKIAIOSFODNN7EXAMPLE is the key") diff --git a/tests/unit/security/test_service.py b/tests/unit/security/test_service.py index 5cab6ade43..0f72d031db 100644 --- a/tests/unit/security/test_service.py +++ b/tests/unit/security/test_service.py @@ -17,6 +17,7 @@ from ai_company.security.config import SecurityConfig from ai_company.security.models import ( OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -343,6 +344,7 @@ async def test_scan_delegates_to_scanner(self) -> None: has_sensitive_data=True, findings=("provider access key",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = _make_service(scan_result=finding_result) ctx = _make_context() @@ -366,6 +368,7 @@ async def test_scan_records_audit_on_findings(self) -> None: has_sensitive_data=True, findings=("Bearer token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = _make_service(scan_result=finding_result) ctx = _make_context() @@ -641,6 +644,7 @@ async def test_scan_audit_failure_still_returns_result(self) -> None: has_sensitive_data=True, findings=("API key detected",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_failing_audit(finding) ctx = _make_context() @@ -656,6 +660,7 @@ async def test_scan_audit_failure_does_not_propagate(self) -> None: has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_failing_audit(finding) ctx = _make_context() @@ -702,6 +707,7 @@ async def test_policy_applied_after_scan(self) -> None: has_sensitive_data=True, findings=("token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -722,6 +728,7 @@ async def test_policy_transforms_result(self) -> None: has_sensitive_data=True, findings=("key",), redacted_content="redacted output", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -759,6 +766,7 @@ async def test_default_policy_from_config_passes_through(self) -> None: has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -780,6 +788,7 @@ async def test_policy_failure_returns_raw_scan_result(self) -> None: has_sensitive_data=True, findings=("token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -812,6 +821,7 @@ async def test_non_recoverable_policy_errors_propagate( has_sensitive_data=True, findings=("key",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, @@ -830,6 +840,7 @@ async def test_audit_preserves_findings_before_policy_clears_them( has_sensitive_data=True, findings=("Bearer token",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ) service = self._make_service_with_policy( scan_result=finding, diff --git a/tests/unit/tools/test_invoker_security.py b/tests/unit/tools/test_invoker_security.py index db8bfec0e9..c9847a574e 100644 --- a/tests/unit/tools/test_invoker_security.py +++ b/tests/unit/tools/test_invoker_security.py @@ -10,6 +10,7 @@ from ai_company.providers.models import ToolCall from ai_company.security.models import ( OutputScanResult, + ScanOutcome, SecurityContext, SecurityVerdict, SecurityVerdictType, @@ -333,6 +334,7 @@ async def test_sensitive_output_is_redacted( has_sensitive_data=True, findings=("API key detected",), redacted_content="executed: [REDACTED]", + outcome=ScanOutcome.REDACTED, ), ) invoker = ToolInvoker( @@ -389,6 +391,7 @@ async def test_sensitive_but_no_redacted_content_fails_closed( has_sensitive_data=True, findings=("potential leak",), redacted_content=None, + outcome=ScanOutcome.WITHHELD, ), ) invoker = ToolInvoker( @@ -397,7 +400,7 @@ async def test_sensitive_but_no_redacted_content_fails_closed( ) result = await invoker.invoke(tool_call) assert result.is_error is True - assert "fail-closed" in result.content.lower() + assert "withheld by security policy" in result.content.lower() assert "executed:" not in result.content @@ -740,6 +743,7 @@ async def test_invoke_all_with_redaction( has_sensitive_data=True, findings=("secret",), redacted_content="[REDACTED]", + outcome=ScanOutcome.REDACTED, ), ) invoker = ToolInvoker( @@ -772,6 +776,7 @@ async def test_soft_error_content_is_scanned(self) -> None: has_sensitive_data=True, findings=("API key",), redacted_content="error: [REDACTED]", + outcome=ScanOutcome.REDACTED, ) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), @@ -806,3 +811,68 @@ async def test_soft_error_scan_receives_error_content(self) -> None: scan_args = interceptor.scan_output.call_args[0] assert scan_args[1] == "error: API_KEY=AKIA1234567890EXAMPLE" + + +# ── Withheld outcome tests ───────────────────────────────────── + + +@pytest.mark.unit +class TestWithheldOutcome: + """Tests for the WITHHELD scan outcome path in the invoker.""" + + async def test_withheld_outcome_returns_policy_message( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome returns explicit policy message.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("secret token",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + + async def test_withheld_metadata_uses_output_withheld_key( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome sets output_withheld metadata, not output_scan_failed.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("credential",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + # Access the internal _scan_output to inspect metadata. + from ai_company.tools.base import ToolExecutionResult + + tool_exec_result = ToolExecutionResult(content="raw output") + from ai_company.security.models import SecurityContext + + context = SecurityContext( + tool_name="secure_tool", + tool_category=ToolCategory.FILE_SYSTEM, + action_type="code:write", + ) + scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) + assert scan_exec.metadata.get("output_withheld") is True + assert "output_scan_failed" not in scan_exec.metadata From fbbcf959a001e48f11472b34c55ad918822a4e06 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 15:12:49 +0100 Subject: [PATCH 2/6] refactor: address pre-PR review findings for ScanOutcome Pre-reviewed by 14 agents, 12 findings addressed: - Tighten OutputScanResult validator: REDACTED requires redacted_content, LOG_ONLY requires has_sensitive_data=False - Extract _handle_sensitive_scan from _scan_output (50-line limit) - Rewrite _scan_output docstring to cover all outcome paths - Add TOOL_OUTPUT_WITHHELD to CLAUDE.md event catalog - Update design spec Output Scan Response Policies with ScanOutcome - Add tests: LOG_ONLY pass-through, WITHHELD priority over redacted_content, new model validation constraints - Remove redundant in-function imports in test --- CLAUDE.md | 2 +- docs/design/operations.md | 25 +++-- src/ai_company/security/models.py | 6 + src/ai_company/tools/invoker.py | 131 +++++++++++++--------- tests/unit/security/test_models.py | 19 ++++ tests/unit/tools/test_invoker_security.py | 57 +++++++++- 6 files changed, 173 insertions(+), 67 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 530bb71a2a..e6dbbea9cf 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -151,7 +151,7 @@ web/ # Vue 3 + PrimeVue + Tailwind CSS dashboard - **Every module** with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)` - **Never** use `import logging` / `logging.getLogger()` / `print()` in application code - **Variable name**: always `logger` (not `_logger`, not `log`) -- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` +- **Event names**: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`, `CFO_ANOMALY_DETECTED` from `events.cfo`, `CONFLICT_DETECTED` from `events.conflict`, `MEETING_STARTED` from `events.meeting`, `CLASSIFICATION_START` from `events.classification`, `CONSOLIDATION_START` from `events.consolidation`, `ORG_MEMORY_QUERY_START` from `events.org_memory`, `API_REQUEST_STARTED` from `events.api`, `API_ROUTE_NOT_FOUND` from `events.api`, `CODE_RUNNER_EXECUTE_START` from `events.code_runner`, `DOCKER_EXECUTE_START` from `events.docker`, `MCP_INVOKE_START` from `events.mcp`, `SECURITY_EVALUATE_START` from `events.security`, `HR_HIRING_REQUEST_CREATED` from `events.hr`, `PERF_METRIC_RECORDED` from `events.performance`, `TRUST_EVALUATE_START` from `events.trust`, `PROMOTION_EVALUATE_START` from `events.promotion`, `PROMPT_BUILD_START` from `events.prompt`, `MEMORY_RETRIEVAL_START` from `events.memory`, `MEMORY_BACKEND_CONNECTED` from `events.memory`, `MEMORY_ENTRY_STORED` from `events.memory`, `MEMORY_BACKEND_SYSTEM_ERROR` from `events.memory`, `AUTONOMY_ACTION_AUTO_APPROVED` from `events.autonomy`, `TIMEOUT_POLICY_EVALUATED` from `events.timeout`, `PERSISTENCE_AUDIT_ENTRY_SAVED` from `events.persistence`, `TASK_ENGINE_STARTED` from `events.task_engine`, `COORDINATION_STARTED` from `events.coordination`, `COMMUNICATION_DISPATCH_START` from `events.communication`, `COMPANY_STARTED` from `events.company`, `CONFIG_LOADED` from `events.config`, `CORRELATION_ID_CREATED` from `events.correlation`, `DECOMPOSITION_STARTED` from `events.decomposition`, `DELEGATION_STARTED` from `events.delegation`, `EXECUTION_LOOP_START` from `events.execution`, `CHECKPOINT_SAVED` from `events.checkpoint`, `PERSISTENCE_CHECKPOINT_SAVED` from `events.persistence`, `GIT_OPERATION_START` from `events.git`, `PARALLEL_GROUP_START` from `events.parallel`, `PERSONALITY_LOADED` from `events.personality`, `QUOTA_CHECKED` from `events.quota`, `ROLE_ASSIGNED` from `events.role`, `ROUTING_STARTED` from `events.routing`, `SANDBOX_EXECUTE_START` from `events.sandbox`, `TASK_CREATED` from `events.task`, `TASK_ASSIGNMENT_STARTED` from `events.task_assignment`, `TASK_ROUTING_STARTED` from `events.task_routing`, `TEMPLATE_LOADED` from `events.template`, `TOOL_INVOKE_START` from `events.tool`, `TOOL_OUTPUT_WITHHELD` from `events.tool`, `WORKSPACE_CREATED` from `events.workspace`, `APPROVAL_GATE_ESCALATION_DETECTED` from `events.approval_gate`, `APPROVAL_GATE_ESCALATION_FAILED` from `events.approval_gate`, `APPROVAL_GATE_INITIALIZED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFIED` from `events.approval_gate`, `APPROVAL_GATE_RISK_CLASSIFY_FAILED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARKED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_PARK_FAILED` from `events.approval_gate`, `APPROVAL_GATE_PARK_TASKLESS` from `events.approval_gate`, `APPROVAL_GATE_RESUME_STARTED` from `events.approval_gate`, `APPROVAL_GATE_CONTEXT_RESUMED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_DELETE_FAILED` from `events.approval_gate`, `APPROVAL_GATE_RESUME_TRIGGERED` from `events.approval_gate`, `APPROVAL_GATE_NO_PARKED_CONTEXT` from `events.approval_gate`, `APPROVAL_GATE_LOOP_WIRING_WARNING` from `events.approval_gate`). Import directly: `from ai_company.observability.events. import EVENT_CONSTANT` - **Structured kwargs**: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)` - **All error paths** must log at WARNING or ERROR with context before raising - **All state transitions** must log at INFO diff --git a/docs/design/operations.md b/docs/design/operations.md index bea6f225e0..8404cf1e6d 100644 --- a/docs/design/operations.md +++ b/docs/design/operations.md @@ -787,14 +787,23 @@ execution. Post-tool-call scanning detects sensitive data in outputs. ### Output Scan Response Policies After the output scanner detects sensitive data, a pluggable `OutputScanResponsePolicy` -protocol decides how to handle the findings: - -| Policy | Behavior | Default for | -|--------|----------|-------------| -| **Redact** (default) | Return scanner's redacted content as-is | `SEMI`, `SUPERVISED` autonomy | -| **Withhold** | Clear redacted content -- fail-closed, no partial data returned | `LOCKED` autonomy | -| **Log-only** | Discard findings (logs at WARNING), pass original output through | `FULL` autonomy | -| **Autonomy-tiered** | Delegate to a sub-policy based on effective autonomy level | Composite policy | +protocol decides how to handle the findings. Each policy sets a `ScanOutcome` enum on the +returned `OutputScanResult` so downstream consumers (primarily `ToolInvoker`) can +distinguish intentional policy decisions from scanner failures: + +| Policy | Behavior | `ScanOutcome` | Default for | +|--------|----------|---------------|-------------| +| **Redact** (default) | Return scanner's redacted content as-is | `REDACTED` | `SEMI`, `SUPERVISED` autonomy | +| **Withhold** | Clear redacted content — content withheld by policy | `WITHHELD` | `LOCKED` autonomy | +| **Log-only** | Discard findings (logs at WARNING), pass original output through | `LOG_ONLY` | `FULL` autonomy | +| **Autonomy-tiered** | Delegate to a sub-policy based on effective autonomy level | *(set by delegate)* | Composite policy | + +The `ScanOutcome` enum (`CLEAN`, `REDACTED`, `WITHHELD`, `LOG_ONLY`) is set by the scanner +(initial `REDACTED` when findings are detected) and may be transformed by the policy (e.g. +`WithholdPolicy` changes `REDACTED` → `WITHHELD`). The `ToolInvoker._scan_output` method +branches on `ScanOutcome.WITHHELD` first to return a dedicated error message ("content +withheld by security policy") with `output_withheld` metadata — distinct from the generic +fail-closed path used for scanner exceptions. Policy selection is declarative via `SecurityConfig.output_scan_policy_type` (`OutputScanPolicyType` enum). A factory function (`build_output_scan_policy`) resolves the diff --git a/src/ai_company/security/models.py b/src/ai_company/security/models.py index 2af0ddf08e..03aa819817 100644 --- a/src/ai_company/security/models.py +++ b/src/ai_company/security/models.py @@ -207,4 +207,10 @@ def _check_consistency(self) -> OutputScanResult: elif self.outcome == ScanOutcome.CLEAN: msg = "outcome='clean' is invalid when has_sensitive_data is True" raise ValueError(msg) + if self.outcome == ScanOutcome.REDACTED and self.redacted_content is None: + msg = "redacted_content must not be None when outcome is 'redacted'" + raise ValueError(msg) + if self.outcome == ScanOutcome.LOG_ONLY and self.has_sensitive_data: + msg = "outcome='log_only' is invalid when has_sensitive_data is True" + raise ValueError(msg) return self diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index 451b47d2fb..6a7f4102c7 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -42,7 +42,12 @@ TOOL_SECURITY_ESCALATED, ) from ai_company.providers.models import ToolCall, ToolResult -from ai_company.security.models import ScanOutcome, SecurityContext, SecurityVerdictType +from ai_company.security.models import ( + OutputScanResult, + ScanOutcome, + SecurityContext, + SecurityVerdictType, +) from .base import ToolExecutionResult from .errors import ToolExecutionError, ToolNotFoundError, ToolParameterError @@ -272,6 +277,67 @@ async def _check_security( is_error=True, ) + def _handle_sensitive_scan( + self, + tool_call: ToolCall, + result: ToolExecutionResult, + scan_result: OutputScanResult, + ) -> ToolExecutionResult: + """Route a sensitive scan result to the correct handler. + + Branches on ``ScanOutcome``: + + - ``WITHHELD``: return error with "withheld by policy" message. + - ``REDACTED`` with content: return redacted content. + - Defensive fallback: withhold output (fail-closed). + """ + if scan_result.outcome == ScanOutcome.WITHHELD: + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="content withheld by security policy", + ) + return ToolExecutionResult( + content=( + "Sensitive data detected — content withheld by security policy." + ), + is_error=True, + metadata={"output_withheld": True}, + ) + if scan_result.redacted_content is not None: + logger.warning( + TOOL_OUTPUT_REDACTED, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + ) + return ToolExecutionResult( + content=scan_result.redacted_content, + is_error=result.is_error, + metadata={ + **result.metadata, + "output_redacted": True, + "redaction_findings": list(scan_result.findings), + }, + ) + # Defensive: model validator prevents REDACTED with + # redacted_content=None, so this is only reachable via + # mocks or future outcome values. + logger.warning( + TOOL_OUTPUT_REDACTED, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="no redacted content available — withholding output", + ) + return ToolExecutionResult( + content=("Sensitive data detected (fail-closed). Tool output withheld."), + is_error=True, + metadata={"output_scan_failed": True}, + ) + async def _scan_output( self, tool_call: ToolCall, @@ -280,10 +346,15 @@ async def _scan_output( ) -> ToolExecutionResult: """Scan tool output for sensitive data (if interceptor is set). - If sensitive data is found and redacted, returns a new - ``ToolExecutionResult`` with the redacted content. Exceptions - from the scanner are caught — the original result is returned - to avoid destroying valid tool output. + When sensitive data is detected, the scan result's ``outcome`` + determines the response: + + - ``WITHHELD``: error result with "withheld by policy" message. + - ``REDACTED``: redacted content replaces the original output. + - ``LOG_ONLY`` / ``CLEAN``: original output passes through. + + Scanner exceptions are caught and fail-closed — a generic error + result is returned to prevent leaking sensitive data. """ if self._security_interceptor is None: return result @@ -302,59 +373,13 @@ async def _scan_output( tool_name=tool_call.name, ) return ToolExecutionResult( - content=("Output scan failed (fail-closed). Tool output withheld."), + content="Output scan failed (fail-closed). Tool output withheld.", is_error=True, metadata={"output_scan_failed": True}, ) if scan_result.has_sensitive_data: - if scan_result.outcome == ScanOutcome.WITHHELD: - logger.warning( - TOOL_OUTPUT_WITHHELD, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - note="content withheld by security policy", - ) - return ToolExecutionResult( - content=( - "Sensitive data detected — content withheld by security policy." - ), - is_error=True, - metadata={"output_withheld": True}, - ) - if scan_result.redacted_content is not None: - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - ) - return ToolExecutionResult( - content=scan_result.redacted_content, - is_error=result.is_error, - metadata={ - **result.metadata, - "output_redacted": True, - "redaction_findings": list(scan_result.findings), - }, - ) - # Defensive: REDACTED outcome but redacted_content is None - # (shouldn't happen). - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - note="no redacted content available — withholding output", - ) - return ToolExecutionResult( - content=( - "Sensitive data detected (fail-closed). Tool output withheld." - ), - is_error=True, - metadata={"output_scan_failed": True}, - ) + return self._handle_sensitive_scan(tool_call, result, scan_result) return result async def invoke(self, tool_call: ToolCall) -> ToolResult: diff --git a/tests/unit/security/test_models.py b/tests/unit/security/test_models.py index 98fd35d038..55967c2c01 100644 --- a/tests/unit/security/test_models.py +++ b/tests/unit/security/test_models.py @@ -565,3 +565,22 @@ def test_outcome_withheld_valid(self) -> None: ) assert result.outcome == ScanOutcome.WITHHELD assert result.redacted_content is None + + def test_outcome_redacted_requires_redacted_content(self) -> None: + """outcome=REDACTED with redacted_content=None is rejected.""" + with pytest.raises(ValidationError, match="redacted_content"): + OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + outcome=ScanOutcome.REDACTED, + redacted_content=None, + ) + + def test_outcome_log_only_rejected_when_sensitive(self) -> None: + """outcome=LOG_ONLY is invalid when has_sensitive_data=True.""" + with pytest.raises(ValidationError, match="outcome"): + OutputScanResult( + has_sensitive_data=True, + findings=("secret",), + outcome=ScanOutcome.LOG_ONLY, + ) diff --git a/tests/unit/tools/test_invoker_security.py b/tests/unit/tools/test_invoker_security.py index c9847a574e..ab9d455827 100644 --- a/tests/unit/tools/test_invoker_security.py +++ b/tests/unit/tools/test_invoker_security.py @@ -862,12 +862,9 @@ async def test_withheld_metadata_uses_output_withheld_key( security_registry, security_interceptor=interceptor, ) - # Access the internal _scan_output to inspect metadata. - from ai_company.tools.base import ToolExecutionResult - + # Access _scan_output to inspect ToolExecutionResult.metadata + # (ToolResult does not surface metadata from the execution layer). tool_exec_result = ToolExecutionResult(content="raw output") - from ai_company.security.models import SecurityContext - context = SecurityContext( tool_name="secure_tool", tool_category=ToolCategory.FILE_SYSTEM, @@ -876,3 +873,53 @@ async def test_withheld_metadata_uses_output_withheld_key( scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) assert scan_exec.metadata.get("output_withheld") is True assert "output_scan_failed" not in scan_exec.metadata + + async def test_withheld_takes_priority_over_redacted_content( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """WITHHELD outcome withholds even when redacted_content is present.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("token",), + redacted_content="partially redacted output", + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + assert "partially redacted" not in result.content + + +# ── LOG_ONLY outcome tests ────────────────────────────────────── + + +@pytest.mark.unit +class TestLogOnlyOutcome: + """Tests for the LOG_ONLY scan outcome path in the invoker.""" + + async def test_log_only_passes_original_output_through( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """LOG_ONLY outcome passes original tool output through unchanged.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult(outcome=ScanOutcome.LOG_ONLY), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is False + assert result.content == "executed: ls" From b1c1c7c2ba204e3e3bbe14f655f41bb9d47b6cb1 Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 16:00:48 +0100 Subject: [PATCH 3/6] fix: address 17 PR review items from 14 agents and 4 external reviewers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix wrong event constant in defensive fallback (TOOL_OUTPUT_REDACTED → TOOL_OUTPUT_WITHHELD) - Fix misleading comment about model validator preventing defensive fallback - Add WITHHELD requires redacted_content=None validator rule - Preserve result.metadata in WITHHELD branch (consistency with REDACTED) - Update WithholdPolicy docstring (no longer "fail-closed") - Add ScanOutcome to security/__init__.py module docstring - Clarify LOG_ONLY docstring (always has_sensitive_data=False) - Log fallback_outcome in service.py policy failure path - Fix _handle_sensitive_scan and _scan_output docstrings - Log unexpected outcome value in defensive fallback - Extract handle_sensitive_scan to scan_result_handler.py (invoker.py 848→779 lines) - Split TestWithheldOutcome/TestLogOnlyOutcome to test_invoker_output_scan.py - Add TestDefensiveFallback with model_copy-based tests for fail-closed path - Repurpose old fail-closed test to verify WITHHELD behavior - Inline helper tool classes via mocking (test file 926→785 lines) - Parametrize outcome validator tests --- src/ai_company/security/__init__.py | 3 +- src/ai_company/security/models.py | 11 +- src/ai_company/security/output_scan_policy.py | 4 +- src/ai_company/security/service.py | 4 +- src/ai_company/tools/invoker.py | 84 +---- src/ai_company/tools/scan_result_handler.py | 90 ++++++ tests/unit/security/test_models.py | 152 +++++---- tests/unit/tools/test_invoker_output_scan.py | 305 ++++++++++++++++++ tests/unit/tools/test_invoker_security.py | 180 ++--------- 9 files changed, 530 insertions(+), 303 deletions(-) create mode 100644 src/ai_company/tools/scan_result_handler.py create mode 100644 tests/unit/tools/test_invoker_output_scan.py diff --git a/src/ai_company/security/__init__.py b/src/ai_company/security/__init__.py index a55b88a773..14005305df 100644 --- a/src/ai_company/security/__init__.py +++ b/src/ai_company/security/__init__.py @@ -7,7 +7,8 @@ - ``SecurityVerdict`` / ``SecurityVerdictType`` — evaluation results. - ``SecurityContext`` — tool invocation context for evaluation. - ``AuditEntry`` / ``AuditLog`` — audit recording. -- ``OutputScanResult`` / ``OutputScanner`` — post-tool output scanning. +- ``OutputScanResult`` / ``ScanOutcome`` / ``OutputScanner`` + — post-tool output scanning. - ``OutputScanResponsePolicy`` — protocol for output scan policies. - ``RedactPolicy`` / ``WithholdPolicy`` / ``LogOnlyPolicy`` / ``AutonomyTieredPolicy`` — policy implementations. diff --git a/src/ai_company/security/models.py b/src/ai_company/security/models.py index 03aa819817..2744cda74d 100644 --- a/src/ai_company/security/models.py +++ b/src/ai_company/security/models.py @@ -27,11 +27,15 @@ class ScanOutcome(StrEnum): downstream consumers (e.g. ``ToolInvoker``) can distinguish intentional withholding from scanner failure. - Members: + Attributes: CLEAN: No sensitive data detected (default). REDACTED: Sensitive data found, redacted content available. WITHHELD: Content intentionally withheld by policy. - LOG_ONLY: Findings logged, original content passed through. + LOG_ONLY: Findings discarded by policy, original content passed + through. Always emitted with ``has_sensitive_data=False`` + because the policy resets the result — the audit log + (written by ``SecOpsService`` before the policy runs) is + the source of truth for what was actually detected. """ CLEAN = "clean" @@ -210,6 +214,9 @@ def _check_consistency(self) -> OutputScanResult: if self.outcome == ScanOutcome.REDACTED and self.redacted_content is None: msg = "redacted_content must not be None when outcome is 'redacted'" raise ValueError(msg) + if self.outcome == ScanOutcome.WITHHELD and self.redacted_content is not None: + msg = "redacted_content must be None when outcome is 'withheld'" + raise ValueError(msg) if self.outcome == ScanOutcome.LOG_ONLY and self.has_sensitive_data: msg = "outcome='log_only' is invalid when has_sensitive_data is True" raise ValueError(msg) diff --git a/src/ai_company/security/output_scan_policy.py b/src/ai_company/security/output_scan_policy.py index 3b1c169b23..3014d6bb7a 100644 --- a/src/ai_company/security/output_scan_policy.py +++ b/src/ai_company/security/output_scan_policy.py @@ -95,7 +95,9 @@ def apply( class WithholdPolicy: """Clear redacted content when sensitive data is found. - Forces fail-closed in the invoker — no partial data is returned. + Sets ``ScanOutcome.WITHHELD`` so the invoker returns a dedicated + "withheld by policy" error — no partial data is returned. This + is distinct from the fail-closed path used for scanner errors. The ``findings`` tuple is deliberately preserved so that audit consumers can categorise what was detected without seeing the actual content. diff --git a/src/ai_company/security/service.py b/src/ai_company/security/service.py index 138e78301b..efe4a5fb12 100644 --- a/src/ai_company/security/service.py +++ b/src/ai_company/security/service.py @@ -284,8 +284,10 @@ async def scan_output( SECURITY_INTERCEPTOR_ERROR, tool_name=context.tool_name, policy=policy_name, + fallback_outcome=result.outcome.value, note="Output scan policy application failed " - "— returning raw scan result", + "— returning raw scan result " + "(may be less strict than intended policy)", ) return result diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index 6a7f4102c7..fbc0391084 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -35,22 +35,16 @@ TOOL_INVOKE_SUCCESS, TOOL_INVOKE_TOOL_ERROR, TOOL_INVOKE_VALIDATION_UNEXPECTED, - TOOL_OUTPUT_REDACTED, - TOOL_OUTPUT_WITHHELD, TOOL_PERMISSION_DENIED, TOOL_SECURITY_DENIED, TOOL_SECURITY_ESCALATED, ) from ai_company.providers.models import ToolCall, ToolResult -from ai_company.security.models import ( - OutputScanResult, - ScanOutcome, - SecurityContext, - SecurityVerdictType, -) +from ai_company.security.models import SecurityContext, SecurityVerdictType from .base import ToolExecutionResult from .errors import ToolExecutionError, ToolNotFoundError, ToolParameterError +from .scan_result_handler import handle_sensitive_scan if TYPE_CHECKING: from collections.abc import Iterable @@ -277,67 +271,6 @@ async def _check_security( is_error=True, ) - def _handle_sensitive_scan( - self, - tool_call: ToolCall, - result: ToolExecutionResult, - scan_result: OutputScanResult, - ) -> ToolExecutionResult: - """Route a sensitive scan result to the correct handler. - - Branches on ``ScanOutcome``: - - - ``WITHHELD``: return error with "withheld by policy" message. - - ``REDACTED`` with content: return redacted content. - - Defensive fallback: withhold output (fail-closed). - """ - if scan_result.outcome == ScanOutcome.WITHHELD: - logger.warning( - TOOL_OUTPUT_WITHHELD, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - note="content withheld by security policy", - ) - return ToolExecutionResult( - content=( - "Sensitive data detected — content withheld by security policy." - ), - is_error=True, - metadata={"output_withheld": True}, - ) - if scan_result.redacted_content is not None: - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - ) - return ToolExecutionResult( - content=scan_result.redacted_content, - is_error=result.is_error, - metadata={ - **result.metadata, - "output_redacted": True, - "redaction_findings": list(scan_result.findings), - }, - ) - # Defensive: model validator prevents REDACTED with - # redacted_content=None, so this is only reachable via - # mocks or future outcome values. - logger.warning( - TOOL_OUTPUT_REDACTED, - tool_call_id=tool_call.id, - tool_name=tool_call.name, - findings=scan_result.findings, - note="no redacted content available — withholding output", - ) - return ToolExecutionResult( - content=("Sensitive data detected (fail-closed). Tool output withheld."), - is_error=True, - metadata={"output_scan_failed": True}, - ) - async def _scan_output( self, tool_call: ToolCall, @@ -346,12 +279,11 @@ async def _scan_output( ) -> ToolExecutionResult: """Scan tool output for sensitive data (if interceptor is set). - When sensitive data is detected, the scan result's ``outcome`` - determines the response: - - - ``WITHHELD``: error result with "withheld by policy" message. - - ``REDACTED``: redacted content replaces the original output. - - ``LOG_ONLY`` / ``CLEAN``: original output passes through. + When sensitive data is detected (``has_sensitive_data=True``), + delegates to ``_handle_sensitive_scan`` which branches on + ``outcome`` (``WITHHELD`` vs ``REDACTED``). When no sensitive + data is detected (including ``LOG_ONLY`` and ``CLEAN`` + outcomes), the original output passes through unchanged. Scanner exceptions are caught and fail-closed — a generic error result is returned to prevent leaking sensitive data. @@ -379,7 +311,7 @@ async def _scan_output( ) if scan_result.has_sensitive_data: - return self._handle_sensitive_scan(tool_call, result, scan_result) + return handle_sensitive_scan(tool_call, result, scan_result) return result async def invoke(self, tool_call: ToolCall) -> ToolResult: diff --git a/src/ai_company/tools/scan_result_handler.py b/src/ai_company/tools/scan_result_handler.py new file mode 100644 index 0000000000..6b3b373b39 --- /dev/null +++ b/src/ai_company/tools/scan_result_handler.py @@ -0,0 +1,90 @@ +"""Output scan result handler — routes sensitive scan results. + +Standalone function extracted from ``ToolInvoker`` to keep +``invoker.py`` under the 800-line file limit. +""" + +from typing import TYPE_CHECKING + +from ai_company.observability import get_logger +from ai_company.observability.events.tool import ( + TOOL_OUTPUT_REDACTED, + TOOL_OUTPUT_WITHHELD, +) +from ai_company.security.models import OutputScanResult, ScanOutcome + +from .base import ToolExecutionResult + +if TYPE_CHECKING: + from ai_company.providers.models import ToolCall + +logger = get_logger(__name__) + + +def handle_sensitive_scan( + tool_call: ToolCall, + result: ToolExecutionResult, + scan_result: OutputScanResult, +) -> ToolExecutionResult: + """Route a sensitive scan result to the correct handler. + + Branches on ``ScanOutcome``: + + - ``WITHHELD``: return error with "withheld by policy" message. + - ``redacted_content`` present: return redacted content. + - Defensive fallback: withhold output (fail-closed). + + Args: + tool_call: The tool call being processed. + result: The original tool execution result. + scan_result: The scan result with ``has_sensitive_data=True``. + + Returns: + A new ``ToolExecutionResult`` reflecting the scan outcome. + """ + if scan_result.outcome == ScanOutcome.WITHHELD: + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + note="content withheld by security policy", + ) + return ToolExecutionResult( + content=("Sensitive data detected — content withheld by security policy."), + is_error=True, + metadata={**result.metadata, "output_withheld": True}, + ) + if scan_result.redacted_content is not None: + logger.warning( + TOOL_OUTPUT_REDACTED, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + ) + return ToolExecutionResult( + content=scan_result.redacted_content, + is_error=result.is_error, + metadata={ + **result.metadata, + "output_redacted": True, + "redaction_findings": list(scan_result.findings), + }, + ) + # Defensive: model_copy() skips model validators, so a policy + # that clears redacted_content without updating outcome could + # produce REDACTED with redacted_content=None. This branch + # catches that case (and future outcome values) — fail-closed. + logger.warning( + TOOL_OUTPUT_WITHHELD, + tool_call_id=tool_call.id, + tool_name=tool_call.name, + findings=scan_result.findings, + outcome=scan_result.outcome.value, + note="no redacted content available — withholding output", + ) + return ToolExecutionResult( + content="Sensitive data detected (fail-closed). Tool output withheld.", + is_error=True, + metadata={"output_scan_failed": True}, + ) diff --git a/tests/unit/security/test_models.py b/tests/unit/security/test_models.py index 55967c2c01..f228152404 100644 --- a/tests/unit/security/test_models.py +++ b/tests/unit/security/test_models.py @@ -522,65 +522,93 @@ def test_json_roundtrip(self) -> None: restored = OutputScanResult.model_validate_json(json_str) assert restored == result - def test_outcome_clean_rejected_when_sensitive(self) -> None: - """outcome=CLEAN is invalid when has_sensitive_data=True.""" - with pytest.raises(ValidationError, match="outcome"): - OutputScanResult( - has_sensitive_data=True, - findings=("leak",), - outcome=ScanOutcome.CLEAN, - ) - - def test_outcome_redacted_rejected_when_not_sensitive(self) -> None: - """outcome=REDACTED is invalid when has_sensitive_data=False.""" - with pytest.raises(ValidationError, match="outcome"): - OutputScanResult( - has_sensitive_data=False, - outcome=ScanOutcome.REDACTED, - ) - - def test_outcome_withheld_rejected_when_not_sensitive(self) -> None: - """outcome=WITHHELD is invalid when has_sensitive_data=False.""" - with pytest.raises(ValidationError, match="outcome"): - OutputScanResult( - has_sensitive_data=False, - outcome=ScanOutcome.WITHHELD, - ) - - def test_outcome_log_only_accepted_when_not_sensitive(self) -> None: - """outcome=LOG_ONLY is valid with has_sensitive_data=False.""" - result = OutputScanResult( - has_sensitive_data=False, - outcome=ScanOutcome.LOG_ONLY, - ) - assert result.outcome == ScanOutcome.LOG_ONLY - - def test_outcome_withheld_valid(self) -> None: - """outcome=WITHHELD is valid with has_sensitive_data=True.""" - result = OutputScanResult( - has_sensitive_data=True, - findings=("secret",), - outcome=ScanOutcome.WITHHELD, - redacted_content=None, - ) - assert result.outcome == ScanOutcome.WITHHELD - assert result.redacted_content is None - - def test_outcome_redacted_requires_redacted_content(self) -> None: - """outcome=REDACTED with redacted_content=None is rejected.""" - with pytest.raises(ValidationError, match="redacted_content"): - OutputScanResult( - has_sensitive_data=True, - findings=("leak",), - outcome=ScanOutcome.REDACTED, - redacted_content=None, - ) - - def test_outcome_log_only_rejected_when_sensitive(self) -> None: - """outcome=LOG_ONLY is invalid when has_sensitive_data=True.""" - with pytest.raises(ValidationError, match="outcome"): - OutputScanResult( - has_sensitive_data=True, - findings=("secret",), - outcome=ScanOutcome.LOG_ONLY, - ) + @pytest.mark.parametrize( + ("kwargs", "match"), + [ + pytest.param( + { + "has_sensitive_data": True, + "findings": ("leak",), + "outcome": ScanOutcome.CLEAN, + }, + "outcome", + id="clean-rejected-when-sensitive", + ), + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.REDACTED}, + "outcome", + id="redacted-rejected-when-not-sensitive", + ), + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.WITHHELD}, + "outcome", + id="withheld-rejected-when-not-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("leak",), + "outcome": ScanOutcome.REDACTED, + "redacted_content": None, + }, + "redacted_content", + id="redacted-requires-redacted-content", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.LOG_ONLY, + }, + "outcome", + id="log-only-rejected-when-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.WITHHELD, + "redacted_content": "should not be set", + }, + "redacted_content", + id="withheld-rejects-non-none-redacted-content", + ), + ], + ) + def test_outcome_validation_rejects_invalid( + self, + kwargs: dict[str, object], + match: str, + ) -> None: + """Parametrized: invalid outcome/field combinations are rejected.""" + with pytest.raises(ValidationError, match=match): + OutputScanResult(**kwargs) # type: ignore[arg-type] + + @pytest.mark.parametrize( + ("kwargs", "expected_outcome"), + [ + pytest.param( + {"has_sensitive_data": False, "outcome": ScanOutcome.LOG_ONLY}, + ScanOutcome.LOG_ONLY, + id="log-only-accepted-when-not-sensitive", + ), + pytest.param( + { + "has_sensitive_data": True, + "findings": ("secret",), + "outcome": ScanOutcome.WITHHELD, + "redacted_content": None, + }, + ScanOutcome.WITHHELD, + id="withheld-valid-when-sensitive", + ), + ], + ) + def test_outcome_validation_accepts_valid( + self, + kwargs: dict[str, object], + expected_outcome: ScanOutcome, + ) -> None: + """Parametrized: valid outcome/field combinations are accepted.""" + result = OutputScanResult(**kwargs) # type: ignore[arg-type] + assert result.outcome == expected_outcome diff --git a/tests/unit/tools/test_invoker_output_scan.py b/tests/unit/tools/test_invoker_output_scan.py new file mode 100644 index 0000000000..e724aae52f --- /dev/null +++ b/tests/unit/tools/test_invoker_output_scan.py @@ -0,0 +1,305 @@ +"""Tests for ScanOutcome-driven output scan handling in ToolInvoker. + +Covers the WITHHELD, LOG_ONLY, and defensive fallback branches of +``ToolInvoker._handle_sensitive_scan``. Split from +``test_invoker_security.py`` to keep file sizes under 800 lines. +""" + +from datetime import UTC, datetime +from typing import Any +from unittest.mock import AsyncMock + +import pytest + +from ai_company.core.enums import ApprovalRiskLevel, ToolCategory +from ai_company.providers.models import ToolCall +from ai_company.security.models import ( + OutputScanResult, + ScanOutcome, + SecurityContext, + SecurityVerdict, + SecurityVerdictType, +) +from ai_company.tools.base import BaseTool, ToolExecutionResult +from ai_company.tools.invoker import ToolInvoker +from ai_company.tools.registry import ToolRegistry + +pytestmark = pytest.mark.timeout(30) + +_NOW = datetime.now(UTC) + + +# ── Concrete test tool ─────────────────────────────────────────── + + +class _OutputScanTestTool(BaseTool): + """Simple tool for output scan integration tests.""" + + def __init__(self) -> None: + super().__init__( + name="secure_tool", + description="Test tool: secure_tool", + category=ToolCategory.FILE_SYSTEM, + ) + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + return ToolExecutionResult( + content=f"executed: {arguments.get('cmd', 'default')}", + ) + + +# ── Helpers ────────────────────────────────────────────────────── + + +def _make_verdict( + *, + verdict: SecurityVerdictType = SecurityVerdictType.ALLOW, + reason: str = "test reason", + risk_level: ApprovalRiskLevel = ApprovalRiskLevel.LOW, + approval_id: str | None = None, +) -> SecurityVerdict: + """Build a SecurityVerdict with sensible defaults.""" + return SecurityVerdict( + verdict=verdict, + reason=reason, + risk_level=risk_level, + evaluated_at=_NOW, + evaluation_duration_ms=1.0, + approval_id=approval_id, + ) + + +def _make_interceptor( + *, + pre_tool_verdict: SecurityVerdict | None = None, + scan_result: OutputScanResult | None = None, +) -> AsyncMock: + """Build a mock SecurityInterceptionStrategy.""" + interceptor = AsyncMock() + interceptor.evaluate_pre_tool = AsyncMock( + return_value=pre_tool_verdict or _make_verdict(), + ) + interceptor.scan_output = AsyncMock( + return_value=scan_result or OutputScanResult(), + ) + return interceptor + + +# ── Fixtures ───────────────────────────────────────────────────── + + +@pytest.fixture +def security_registry() -> ToolRegistry: + return ToolRegistry([_OutputScanTestTool()]) + + +@pytest.fixture +def tool_call() -> ToolCall: + return ToolCall( + id="call_scan_001", + name="secure_tool", + arguments={"cmd": "ls"}, + ) + + +# ── Withheld outcome tests ───────────────────────────────────── + + +@pytest.mark.unit +class TestWithheldOutcome: + """Tests for the WITHHELD scan outcome path in the invoker.""" + + async def test_withheld_outcome_returns_policy_message( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome returns explicit policy message.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("secret token",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + + async def test_withheld_metadata_uses_output_withheld_key( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Withheld outcome sets output_withheld metadata, not output_scan_failed.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult( + has_sensitive_data=True, + findings=("credential",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + # Access _scan_output to inspect ToolExecutionResult.metadata + # (ToolResult does not surface the metadata field from the + # execution layer, so the public invoke() API cannot verify + # metadata keys). + tool_exec_result = ToolExecutionResult(content="raw output") + context = SecurityContext( + tool_name="secure_tool", + tool_category=ToolCategory.FILE_SYSTEM, + action_type="code:write", + ) + scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) + assert scan_exec.metadata.get("output_withheld") is True + assert "output_scan_failed" not in scan_exec.metadata + + async def test_withheld_takes_priority_over_redacted_content( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """WITHHELD outcome withholds even when redacted_content is present. + + The model validator now rejects WITHHELD + non-None redacted_content + at construction time. This test uses ``model_copy`` (which skips + validators) to verify the invoker's defence-in-depth branching: + WITHHELD is checked before redacted_content. + """ + # Build a valid WITHHELD result, then sneak in redacted_content + # via model_copy (which bypasses the model validator). + base = OutputScanResult( + has_sensitive_data=True, + findings=("token",), + redacted_content=None, + outcome=ScanOutcome.WITHHELD, + ) + broken = base.model_copy( + update={"redacted_content": "partially redacted output"}, + ) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "withheld by security policy" in result.content.lower() + assert "partially redacted" not in result.content + + +# ── LOG_ONLY outcome tests ────────────────────────────────────── + + +@pytest.mark.unit +class TestLogOnlyOutcome: + """Tests for the LOG_ONLY scan outcome path in the invoker.""" + + async def test_log_only_passes_original_output_through( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """LOG_ONLY outcome passes original tool output through unchanged.""" + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=OutputScanResult(outcome=ScanOutcome.LOG_ONLY), + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is False + assert result.content == "executed: ls" + + +# ── Defensive fallback tests ──────────────────────────────────── + + +@pytest.mark.unit +class TestDefensiveFallback: + """Tests for the defensive fail-closed fallback in _handle_sensitive_scan. + + This branch catches unexpected states where ``has_sensitive_data=True`` + but outcome is not ``WITHHELD`` and ``redacted_content`` is ``None``. + Reachable when ``model_copy()`` skips validators. + """ + + async def test_defensive_fallback_withholds_on_unexpected_state( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Defensive fallback withholds when scan has sensitive data + but neither WITHHELD outcome nor redacted_content.""" + # Simulate a broken policy via model_copy (skips validators): + # REDACTED outcome but redacted_content cleared. + base = OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + redacted_content="safe", + outcome=ScanOutcome.REDACTED, + ) + broken = base.model_copy(update={"redacted_content": None}) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + result = await invoker.invoke(tool_call) + assert result.is_error is True + assert "fail-closed" in result.content.lower() + assert "executed:" not in result.content + + async def test_defensive_fallback_metadata_uses_scan_failed_key( + self, + security_registry: ToolRegistry, + tool_call: ToolCall, + ) -> None: + """Defensive fallback sets output_scan_failed metadata.""" + base = OutputScanResult( + has_sensitive_data=True, + findings=("leak",), + redacted_content="safe", + outcome=ScanOutcome.REDACTED, + ) + broken = base.model_copy(update={"redacted_content": None}) + interceptor = _make_interceptor( + pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), + scan_result=broken, + ) + invoker = ToolInvoker( + security_registry, + security_interceptor=interceptor, + ) + tool_exec_result = ToolExecutionResult(content="raw output") + context = SecurityContext( + tool_name="secure_tool", + tool_category=ToolCategory.FILE_SYSTEM, + action_type="code:write", + ) + scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) + assert scan_exec.metadata.get("output_scan_failed") is True + assert "output_withheld" not in scan_exec.metadata diff --git a/tests/unit/tools/test_invoker_security.py b/tests/unit/tools/test_invoker_security.py index ab9d455827..42a2462003 100644 --- a/tests/unit/tools/test_invoker_security.py +++ b/tests/unit/tools/test_invoker_security.py @@ -379,12 +379,12 @@ async def test_scan_output_called_after_successful_execution( await invoker.invoke(tool_call) interceptor.scan_output.assert_awaited_once() - async def test_sensitive_but_no_redacted_content_fails_closed( + async def test_withheld_outcome_returns_policy_message( self, security_registry: ToolRegistry, tool_call: ToolCall, ) -> None: - """If has_sensitive_data=True but redacted_content is None, fail-closed.""" + """WITHHELD outcome returns explicit policy message (not fail-closed).""" interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), scan_result=OutputScanResult( @@ -574,49 +574,6 @@ async def test_scan_output_context_matches_pre_tool_context( assert pre_ctx.task_id == scan_ctx.task_id -# ── Helper tools for gap tests ─────────────────────────────────── - - -class _SecurityFailingTool(BaseTool): - """Tool that raises RuntimeError from execute.""" - - def __init__(self) -> None: - super().__init__( - name="failing_tool", - description="Tool that always fails", - category=ToolCategory.FILE_SYSTEM, - ) - - async def execute( - self, - *, - arguments: dict[str, Any], - ) -> ToolExecutionResult: - msg = "intentional failure" - raise RuntimeError(msg) - - -class _SecuritySoftErrorTool(BaseTool): - """Tool that returns is_error=True with sensitive content.""" - - def __init__(self) -> None: - super().__init__( - name="soft_error_tool", - description="Tool returning soft error", - category=ToolCategory.FILE_SYSTEM, - ) - - async def execute( - self, - *, - arguments: dict[str, Any], - ) -> ToolExecutionResult: - return ToolExecutionResult( - is_error=True, - content="error: API_KEY=AKIA1234567890EXAMPLE", - ) - - # ── Gap 1: Non-recoverable errors from scan propagate ──────────── @@ -659,7 +616,10 @@ class TestOutputScanSkippedOnToolError: """When tool.execute() raises, scan_output is not called.""" async def test_tool_execution_error_skips_output_scan(self) -> None: - failing_tool = _SecurityFailingTool() + failing_tool = _SecurityTestTool(name="failing_tool") + failing_tool.execute = AsyncMock( # type: ignore[method-assign] + side_effect=RuntimeError("intentional failure"), + ) registry = ToolRegistry([failing_tool]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), @@ -770,7 +730,13 @@ class TestOutputScanOnSoftError: async def test_soft_error_content_is_scanned(self) -> None: """When tool returns is_error=True, scan_output is still called.""" - soft_tool = _SecuritySoftErrorTool() + soft_tool = _SecurityTestTool(name="soft_error_tool") + soft_tool.execute = AsyncMock( # type: ignore[method-assign] + return_value=ToolExecutionResult( + is_error=True, + content="error: API_KEY=AKIA1234567890EXAMPLE", + ), + ) registry = ToolRegistry([soft_tool]) scan_result = OutputScanResult( has_sensitive_data=True, @@ -796,7 +762,13 @@ async def test_soft_error_content_is_scanned(self) -> None: async def test_soft_error_scan_receives_error_content(self) -> None: """Verify scan_output receives the error content string.""" - soft_tool = _SecuritySoftErrorTool() + soft_tool = _SecurityTestTool(name="soft_error_tool") + soft_tool.execute = AsyncMock( # type: ignore[method-assign] + return_value=ToolExecutionResult( + is_error=True, + content="error: API_KEY=AKIA1234567890EXAMPLE", + ), + ) registry = ToolRegistry([soft_tool]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), @@ -811,115 +783,3 @@ async def test_soft_error_scan_receives_error_content(self) -> None: scan_args = interceptor.scan_output.call_args[0] assert scan_args[1] == "error: API_KEY=AKIA1234567890EXAMPLE" - - -# ── Withheld outcome tests ───────────────────────────────────── - - -@pytest.mark.unit -class TestWithheldOutcome: - """Tests for the WITHHELD scan outcome path in the invoker.""" - - async def test_withheld_outcome_returns_policy_message( - self, - security_registry: ToolRegistry, - tool_call: ToolCall, - ) -> None: - """Withheld outcome returns explicit policy message.""" - interceptor = _make_interceptor( - pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), - scan_result=OutputScanResult( - has_sensitive_data=True, - findings=("secret token",), - redacted_content=None, - outcome=ScanOutcome.WITHHELD, - ), - ) - invoker = ToolInvoker( - security_registry, - security_interceptor=interceptor, - ) - result = await invoker.invoke(tool_call) - assert result.is_error is True - assert "withheld by security policy" in result.content.lower() - - async def test_withheld_metadata_uses_output_withheld_key( - self, - security_registry: ToolRegistry, - tool_call: ToolCall, - ) -> None: - """Withheld outcome sets output_withheld metadata, not output_scan_failed.""" - interceptor = _make_interceptor( - pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), - scan_result=OutputScanResult( - has_sensitive_data=True, - findings=("credential",), - redacted_content=None, - outcome=ScanOutcome.WITHHELD, - ), - ) - invoker = ToolInvoker( - security_registry, - security_interceptor=interceptor, - ) - # Access _scan_output to inspect ToolExecutionResult.metadata - # (ToolResult does not surface metadata from the execution layer). - tool_exec_result = ToolExecutionResult(content="raw output") - context = SecurityContext( - tool_name="secure_tool", - tool_category=ToolCategory.FILE_SYSTEM, - action_type="code:write", - ) - scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) - assert scan_exec.metadata.get("output_withheld") is True - assert "output_scan_failed" not in scan_exec.metadata - - async def test_withheld_takes_priority_over_redacted_content( - self, - security_registry: ToolRegistry, - tool_call: ToolCall, - ) -> None: - """WITHHELD outcome withholds even when redacted_content is present.""" - interceptor = _make_interceptor( - pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), - scan_result=OutputScanResult( - has_sensitive_data=True, - findings=("token",), - redacted_content="partially redacted output", - outcome=ScanOutcome.WITHHELD, - ), - ) - invoker = ToolInvoker( - security_registry, - security_interceptor=interceptor, - ) - result = await invoker.invoke(tool_call) - assert result.is_error is True - assert "withheld by security policy" in result.content.lower() - assert "partially redacted" not in result.content - - -# ── LOG_ONLY outcome tests ────────────────────────────────────── - - -@pytest.mark.unit -class TestLogOnlyOutcome: - """Tests for the LOG_ONLY scan outcome path in the invoker.""" - - async def test_log_only_passes_original_output_through( - self, - security_registry: ToolRegistry, - tool_call: ToolCall, - ) -> None: - """LOG_ONLY outcome passes original tool output through unchanged.""" - interceptor = _make_interceptor( - pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), - scan_result=OutputScanResult(outcome=ScanOutcome.LOG_ONLY), - ) - invoker = ToolInvoker( - security_registry, - security_interceptor=interceptor, - ) - result = await invoker.invoke(tool_call) - assert result.is_error is False - assert result.content == "executed: ls" From 76bbd32878da3b218aa64280aad4407ae2a18bca Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 16:14:01 +0100 Subject: [PATCH 4/6] fix: address 5 external reviewer findings (Gemini round 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add missing inverse findings constraint: reject empty findings when has_sensitive_data=True (closes validator asymmetry) - Preserve result.metadata in defensive fallback branch (consistency with WITHHELD and REDACTED branches) - Fix _scan_output docstring: reference handle_sensitive_scan (not the removed _handle_sensitive_scan) - Fix test module docstring: reference handle_sensitive_scan - Replace execute method reassignment with proper subclasses (_FailingSecurityTool, _SoftErrorSecurityTool) — eliminates type: ignore comments --- src/ai_company/security/models.py | 3 ++ src/ai_company/tools/invoker.py | 2 +- src/ai_company/tools/scan_result_handler.py | 2 +- tests/unit/security/test_models.py | 9 ++++ tests/unit/tools/test_invoker_output_scan.py | 5 +- tests/unit/tools/test_invoker_security.py | 56 ++++++++++++-------- 6 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/ai_company/security/models.py b/src/ai_company/security/models.py index 2744cda74d..4e1fab6150 100644 --- a/src/ai_company/security/models.py +++ b/src/ai_company/security/models.py @@ -211,6 +211,9 @@ def _check_consistency(self) -> OutputScanResult: elif self.outcome == ScanOutcome.CLEAN: msg = "outcome='clean' is invalid when has_sensitive_data is True" raise ValueError(msg) + elif not self.findings: + msg = "findings must not be empty when has_sensitive_data is True" + raise ValueError(msg) if self.outcome == ScanOutcome.REDACTED and self.redacted_content is None: msg = "redacted_content must not be None when outcome is 'redacted'" raise ValueError(msg) diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index fbc0391084..344247a574 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -280,7 +280,7 @@ async def _scan_output( """Scan tool output for sensitive data (if interceptor is set). When sensitive data is detected (``has_sensitive_data=True``), - delegates to ``_handle_sensitive_scan`` which branches on + delegates to ``handle_sensitive_scan`` which branches on ``outcome`` (``WITHHELD`` vs ``REDACTED``). When no sensitive data is detected (including ``LOG_ONLY`` and ``CLEAN`` outcomes), the original output passes through unchanged. diff --git a/src/ai_company/tools/scan_result_handler.py b/src/ai_company/tools/scan_result_handler.py index 6b3b373b39..4911284802 100644 --- a/src/ai_company/tools/scan_result_handler.py +++ b/src/ai_company/tools/scan_result_handler.py @@ -86,5 +86,5 @@ def handle_sensitive_scan( return ToolExecutionResult( content="Sensitive data detected (fail-closed). Tool output withheld.", is_error=True, - metadata={"output_scan_failed": True}, + metadata={**result.metadata, "output_scan_failed": True}, ) diff --git a/tests/unit/security/test_models.py b/tests/unit/security/test_models.py index f228152404..992d5fc3ff 100644 --- a/tests/unit/security/test_models.py +++ b/tests/unit/security/test_models.py @@ -563,6 +563,15 @@ def test_json_roundtrip(self) -> None: "outcome", id="log-only-rejected-when-sensitive", ), + pytest.param( + { + "has_sensitive_data": True, + "findings": (), + "outcome": ScanOutcome.WITHHELD, + }, + "findings", + id="empty-findings-rejected-when-sensitive", + ), pytest.param( { "has_sensitive_data": True, diff --git a/tests/unit/tools/test_invoker_output_scan.py b/tests/unit/tools/test_invoker_output_scan.py index e724aae52f..832c4f658e 100644 --- a/tests/unit/tools/test_invoker_output_scan.py +++ b/tests/unit/tools/test_invoker_output_scan.py @@ -1,8 +1,9 @@ """Tests for ScanOutcome-driven output scan handling in ToolInvoker. Covers the WITHHELD, LOG_ONLY, and defensive fallback branches of -``ToolInvoker._handle_sensitive_scan``. Split from -``test_invoker_security.py`` to keep file sizes under 800 lines. +``handle_sensitive_scan`` (exercised via the ``ToolInvoker`` flow). +Split from ``test_invoker_security.py`` to keep file sizes under +800 lines. """ from datetime import UTC, datetime diff --git a/tests/unit/tools/test_invoker_security.py b/tests/unit/tools/test_invoker_security.py index 42a2462003..a38e024350 100644 --- a/tests/unit/tools/test_invoker_security.py +++ b/tests/unit/tools/test_invoker_security.py @@ -52,6 +52,38 @@ async def execute( ) +class _FailingSecurityTool(_SecurityTestTool): + """Tool that raises RuntimeError from execute.""" + + def __init__(self) -> None: + super().__init__(name="failing_tool") + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + msg = "intentional failure" + raise RuntimeError(msg) + + +class _SoftErrorSecurityTool(_SecurityTestTool): + """Tool that returns is_error=True with sensitive content.""" + + def __init__(self) -> None: + super().__init__(name="soft_error_tool") + + async def execute( + self, + *, + arguments: dict[str, Any], + ) -> ToolExecutionResult: + return ToolExecutionResult( + is_error=True, + content="error: API_KEY=AKIA1234567890EXAMPLE", + ) + + # ── Helpers ────────────────────────────────────────────────────── _NOW = datetime.now(UTC) @@ -616,11 +648,7 @@ class TestOutputScanSkippedOnToolError: """When tool.execute() raises, scan_output is not called.""" async def test_tool_execution_error_skips_output_scan(self) -> None: - failing_tool = _SecurityTestTool(name="failing_tool") - failing_tool.execute = AsyncMock( # type: ignore[method-assign] - side_effect=RuntimeError("intentional failure"), - ) - registry = ToolRegistry([failing_tool]) + registry = ToolRegistry([_FailingSecurityTool()]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), ) @@ -730,14 +758,7 @@ class TestOutputScanOnSoftError: async def test_soft_error_content_is_scanned(self) -> None: """When tool returns is_error=True, scan_output is still called.""" - soft_tool = _SecurityTestTool(name="soft_error_tool") - soft_tool.execute = AsyncMock( # type: ignore[method-assign] - return_value=ToolExecutionResult( - is_error=True, - content="error: API_KEY=AKIA1234567890EXAMPLE", - ), - ) - registry = ToolRegistry([soft_tool]) + registry = ToolRegistry([_SoftErrorSecurityTool()]) scan_result = OutputScanResult( has_sensitive_data=True, findings=("API key",), @@ -762,14 +783,7 @@ async def test_soft_error_content_is_scanned(self) -> None: async def test_soft_error_scan_receives_error_content(self) -> None: """Verify scan_output receives the error content string.""" - soft_tool = _SecurityTestTool(name="soft_error_tool") - soft_tool.execute = AsyncMock( # type: ignore[method-assign] - return_value=ToolExecutionResult( - is_error=True, - content="error: API_KEY=AKIA1234567890EXAMPLE", - ), - ) - registry = ToolRegistry([soft_tool]) + registry = ToolRegistry([_SoftErrorSecurityTool()]) interceptor = _make_interceptor( pre_tool_verdict=_make_verdict(verdict=SecurityVerdictType.ALLOW), ) From ba500119a7f75ea72b198c7c7d1c203256706a3e Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 16:21:08 +0100 Subject: [PATCH 5/6] fix: preserve result.metadata in scanner-exception fail-closed branch Copilot correctly identified that the scanner-exception path in _scan_output drops tool execution metadata. Align with handle_sensitive_scan by merging result.metadata. --- src/ai_company/tools/invoker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ai_company/tools/invoker.py b/src/ai_company/tools/invoker.py index 344247a574..7a3388a130 100644 --- a/src/ai_company/tools/invoker.py +++ b/src/ai_company/tools/invoker.py @@ -307,7 +307,7 @@ async def _scan_output( return ToolExecutionResult( content="Output scan failed (fail-closed). Tool output withheld.", is_error=True, - metadata={"output_scan_failed": True}, + metadata={**result.metadata, "output_scan_failed": True}, ) if scan_result.has_sensitive_data: From 5b51f369e435ef9f806bc8dfbfa5c05dd25b9e6c Mon Sep 17 00:00:00 2001 From: Aurelio <19254254+Aureliolo@users.noreply.github.com> Date: Sat, 14 Mar 2026 16:25:45 +0100 Subject: [PATCH 6/6] test: add sentinel metadata to verify metadata preservation in scan tests - Add sentinel metadata key to ToolExecutionResult in _scan_output tests so regressions in metadata merging are caught - Fix TestDefensiveFallback docstring to reference handle_sensitive_scan --- tests/unit/tools/test_invoker_output_scan.py | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/unit/tools/test_invoker_output_scan.py b/tests/unit/tools/test_invoker_output_scan.py index 832c4f658e..db1fccc234 100644 --- a/tests/unit/tools/test_invoker_output_scan.py +++ b/tests/unit/tools/test_invoker_output_scan.py @@ -160,7 +160,10 @@ async def test_withheld_metadata_uses_output_withheld_key( # (ToolResult does not surface the metadata field from the # execution layer, so the public invoke() API cannot verify # metadata keys). - tool_exec_result = ToolExecutionResult(content="raw output") + tool_exec_result = ToolExecutionResult( + content="raw output", + metadata={"sentinel": True}, + ) context = SecurityContext( tool_name="secure_tool", tool_category=ToolCategory.FILE_SYSTEM, @@ -168,6 +171,7 @@ async def test_withheld_metadata_uses_output_withheld_key( ) scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) assert scan_exec.metadata.get("output_withheld") is True + assert scan_exec.metadata.get("sentinel") is True assert "output_scan_failed" not in scan_exec.metadata async def test_withheld_takes_priority_over_redacted_content( @@ -238,11 +242,12 @@ async def test_log_only_passes_original_output_through( @pytest.mark.unit class TestDefensiveFallback: - """Tests for the defensive fail-closed fallback in _handle_sensitive_scan. + """Tests for the defensive fail-closed fallback in handle_sensitive_scan. - This branch catches unexpected states where ``has_sensitive_data=True`` - but outcome is not ``WITHHELD`` and ``redacted_content`` is ``None``. - Reachable when ``model_copy()`` skips validators. + Exercised via ``ToolInvoker._scan_output``. This branch catches + unexpected states where ``has_sensitive_data=True`` but outcome is + not ``WITHHELD`` and ``redacted_content`` is ``None``. Reachable + when ``model_copy()`` skips validators. """ async def test_defensive_fallback_withholds_on_unexpected_state( @@ -295,7 +300,10 @@ async def test_defensive_fallback_metadata_uses_scan_failed_key( security_registry, security_interceptor=interceptor, ) - tool_exec_result = ToolExecutionResult(content="raw output") + tool_exec_result = ToolExecutionResult( + content="raw output", + metadata={"sentinel": True}, + ) context = SecurityContext( tool_name="secure_tool", tool_category=ToolCategory.FILE_SYSTEM, @@ -303,4 +311,5 @@ async def test_defensive_fallback_metadata_uses_scan_failed_key( ) scan_exec = await invoker._scan_output(tool_call, tool_exec_result, context) assert scan_exec.metadata.get("output_scan_failed") is True + assert scan_exec.metadata.get("sentinel") is True assert "output_withheld" not in scan_exec.metadata