Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions daemon/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,23 @@ async def history(
"as_of": as_of,
},
)

async def usage_summary(
self,
*,
repo_id: str,
days: int = 7,
) -> dict[str, Any]:
"""Invoke ``read.usage_summary`` on the daemon and return the raw payload.

The MCP-side ``handle_usage_summary`` facade is responsible for
round-tripping this dict through ``UsageSummaryResult.model_validate``
to enforce the wire shape before returning to the agent.
"""
return await self._call_with_retry(
"read.usage_summary",
{
"repo_id": repo_id,
"days": days,
},
)
29 changes: 28 additions & 1 deletion handlers/usage_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,34 @@

@read_tool("read.usage_summary")
async def handle_usage_summary(ctx, days: int = 7) -> dict:
"""Aggregate usage stats over the last `days` days.
"""MCP-side facade for ``read.usage_summary``.

Phase 2c-5 — mirror of 2c-4's pattern for handle_history. The body is
now a thin wrapper that delegates to the daemon via
``ctx.daemon.usage_summary``; the real read logic lives in
``_handle_usage_summary_impl`` (which the daemon's protocol handler
invokes on its end).
"""
daemon = getattr(ctx, "daemon", None)
if daemon is None:
# Test contexts that mock-construct BicameralContext without going
# through ``from_env`` won't have a daemon proxy. Fall through to
# the in-process implementation so handler-level tests stay
# sociable. Production paths always go through ``from_env`` →
# ``DaemonProxy``.
return await _handle_usage_summary_impl(ctx, days=days)

raw = await daemon.usage_summary(repo_id="local", days=days)
from protocol.contracts import UsageSummaryResult

return UsageSummaryResult.model_validate(raw).model_dump()


async def _handle_usage_summary_impl(ctx, days: int = 7) -> dict:
"""Core usage_summary logic — pure ledger read + transformation.

Invoked by the daemon's ``read.usage_summary`` protocol handler. Does
NOT route through the daemon — that would create an infinite RPC loop.

Returns the schema specified in #42:
period_days, ingest_calls, bind_calls_total, decisions_ingested,
Expand Down
11 changes: 7 additions & 4 deletions protocol/handlers/reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,14 @@ async def handle_read_usage_summary(
) -> dict[str, Any]:
req = UsageSummaryRequest.model_validate(params)
bctx = _resolve_context(ctx, req.repo_id)
from handlers.usage_summary import handle_usage_summary
# Phase 2c-5: call the core impl, NOT the facade. The facade routes
# through ``ctx.daemon.usage_summary(...)``; invoking it from inside the
# daemon's own protocol handler would create an infinite RPC loop.
from handlers.usage_summary import _handle_usage_summary_impl

raw = await handle_usage_summary(bctx, days=req.days)
# ``handle_usage_summary`` returns a plain dict; round-trip it through the
# typed result so the wire shape is enforced.
raw = await _handle_usage_summary_impl(bctx, days=req.days)
# ``_handle_usage_summary_impl`` returns a plain dict; round-trip it through
# the typed result so the wire shape is enforced.
return UsageSummaryResult.model_validate(raw).model_dump()


Expand Down
7 changes: 6 additions & 1 deletion tests/test_usage_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

import pytest

from handlers.usage_summary import handle_usage_summary
# Phase 2c-5: existing usage_summary tests exercise the core read logic against
# a mock ledger — that's sociable handler-level testing and should NOT require
# a running daemon. Alias the impl so the test bodies stay identical; new
# boundary tests live in ``tests/test_usage_summary_via_daemon.py`` and go
# through the facade + a per-test daemon fixture.
from handlers.usage_summary import _handle_usage_summary_impl as handle_usage_summary


def _ctx_with_decisions(
Expand Down
171 changes: 171 additions & 0 deletions tests/test_usage_summary_via_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
"""Phase 2c-5 boundary tests: handle_usage_summary through a real daemon subprocess.

Mirrors ``tests/test_history_via_daemon.py`` — same structure, same fixture,
same Fowler boundary-test rationale. Each test exercises the full call chain
across the IPC boundary:

handle_usage_summary (facade in MCP process)
→ ctx.daemon.usage_summary (DaemonProxy)
→ ProtocolClient over UDS
→ daemon subprocess
→ protocol/handlers/reads.handle_read_usage_summary
→ _handle_usage_summary_impl (in daemon's ledger)

Three things these tests verify that no in-process test can:

1. **Wire serialization** — UsageSummaryResult round-trips through JSON.
2. **Connection lifecycle** — descriptor missing / daemon dead → clear
``DaemonUnreachableError``; reconnect after daemon restart succeeds.
3. **Same-shape contract** — the facade returns equivalent dict shape to
what ``_handle_usage_summary_impl`` returns in-process.

Cost: ~8s per test (daemon subprocess spawn). Per the plan, this is acceptable
for boundary tests. When per-test cost becomes painful, swap the fixture body
for an ObjectPool of pre-warmed daemons without touching any test.
"""

from __future__ import annotations

import os
import subprocess
from types import SimpleNamespace

import pytest

from daemon.process import spawn, stop
from daemon.proxy import DaemonProxy, DaemonUnreachableError
from tests._daemon_fixture import daemon_subprocess, short_state_dir # noqa: F401


@pytest.fixture
def fresh_ledger_repo(monkeypatch, tmp_path):
"""A bare git repo + memory:// ledger env. The daemon picks these up
via ``REPO_PATH`` / ``SURREAL_URL`` when ``BicameralContext.from_env``
runs inside it.
"""
monkeypatch.setenv("SURREAL_URL", "memory://")
monkeypatch.setenv("REPO_PATH", str(tmp_path))
subprocess.run(["git", "init", "-q"], cwd=tmp_path, check=True)
subprocess.run(
["git", "commit", "--allow-empty", "-q", "-m", "init"],
cwd=tmp_path,
check=True,
env={
**os.environ,
"GIT_AUTHOR_NAME": "t",
"GIT_AUTHOR_EMAIL": "t@t",
"GIT_COMMITTER_NAME": "t",
"GIT_COMMITTER_EMAIL": "t@t",
},
)
return tmp_path


# ── Daemon unreachable ──────────────────────────────────────────────────


async def test_proxy_raises_when_no_descriptor(tmp_path):
"""No ``daemon.json`` and no ``auth.json`` → ``DaemonUnreachableError``
with the wizard-pointing message."""
proxy = DaemonProxy(
descriptor_path=tmp_path / "daemon.json",
auth_path=tmp_path / "auth.json",
)
with pytest.raises(DaemonUnreachableError) as exc_info:
await proxy.usage_summary(repo_id="local")
msg = str(exc_info.value)
assert "bicameral-mcp setup" in msg
assert "bicameral-mcp daemon start" in msg


# ── Daemon-routed happy path ────────────────────────────────────────────


async def test_usage_summary_through_daemon_returns_baseline(
daemon_subprocess, fresh_ledger_repo, tmp_path
):
"""End-to-end: spawned daemon, DaemonProxy, real RPC, empty ledger
returns the zero-count baseline UsageSummaryResult."""
proxy = DaemonProxy(
descriptor_path=daemon_subprocess.socket_path.parent / "daemon.json",
auth_path=tmp_path / "no-auth.json", # absent
)
try:
result = await proxy.usage_summary(repo_id="local", days=7)
# All numeric fields must be present with zero-count baseline values.
assert "period_days" in result
assert "decisions_ingested" in result
assert result["period_days"] == 7
assert result["decisions_ingested"] == 0
assert result["reflected_pct"] == 0.0
assert result["drift_pct"] == 0.0
finally:
await proxy.close()


async def test_usage_summary_facade_routes_through_daemon(
daemon_subprocess, fresh_ledger_repo, tmp_path
):
"""The MCP-side facade ``handle_usage_summary`` calls
``ctx.daemon.usage_summary`` and returns a dict with the correct shape.
Asserts the daemon path is exercised (not the in-process fallback for
daemon=None contexts)."""
from handlers.usage_summary import handle_usage_summary

proxy = DaemonProxy(
descriptor_path=daemon_subprocess.socket_path.parent / "daemon.json",
auth_path=tmp_path / "no-auth.json",
)
# Minimal ctx — usage_summary does not call ensure_ledger_synced,
# so we only need repo_path + daemon on the context object.
ctx = SimpleNamespace(
repo_path=str(fresh_ledger_repo),
daemon=proxy,
)
try:
result = await handle_usage_summary(ctx, days=7)
assert isinstance(result, dict)
assert result["period_days"] == 7
assert result["decisions_ingested"] == 0
assert "reflected_pct" in result
assert "drift_pct" in result
finally:
await proxy.close()


# ── Reconnect after daemon restart ──────────────────────────────────────


async def test_proxy_reconnects_after_daemon_restart(short_state_dir, fresh_ledger_repo):
"""First call → daemon dies → daemon restarts → second call succeeds.

Contract: the proxy detects the dropped connection on the failing
call, clears its cached client, re-resolves the descriptor, opens
a new connection, and retries. ONE retry max — if the second open
also fails, raise ``DaemonUnreachableError``.
"""
socket_path = short_state_dir / "daemon.sock"
descriptor_path = short_state_dir / "daemon.json"

# Spawn the first daemon and make a successful call.
spawn(socket_path=socket_path, descriptor_path=descriptor_path)
proxy = DaemonProxy(
descriptor_path=descriptor_path,
auth_path=short_state_dir / "no-auth.json",
)
result1 = await proxy.usage_summary(repo_id="local")
assert "period_days" in result1

# Kill the daemon. The proxy's cached client now points at a dead socket.
stop(descriptor_path=descriptor_path)

# Respawn — same paths, different PID.
spawn(socket_path=socket_path, descriptor_path=descriptor_path)

try:
# Next call should detect the broken connection, reconnect, succeed.
result2 = await proxy.usage_summary(repo_id="local")
assert "period_days" in result2
finally:
await proxy.close()
stop(descriptor_path=descriptor_path)
Loading