diff --git a/IMPLEMENTATION_PLAN.md b/IMPLEMENTATION_PLAN.md index 3270c70c2..f8fba1926 100644 --- a/IMPLEMENTATION_PLAN.md +++ b/IMPLEMENTATION_PLAN.md @@ -340,11 +340,14 @@ Done when: **Previously:** Task 1.7 Done when: -- [ ] Web search tool with Brave Search API and SearXNG backends, configurable via `netclaw.json`. -- [ ] Web fetch tool with HTML-to-text extraction and output truncation. -- [ ] Shell execution tool with timeout, output truncation, stdin closure, working directory. -- [ ] GitHub CLI tool via `gh` shell-out with structured output parsing and missing dependency handling. -- [ ] Tests for each tool with mocked HTTP/process dependencies. +- [x] Shell execution tool with timeout, output truncation, stdin closure, working directory. +- [x] File read and file write tools with path validation and output truncation. +- [x] Source-generated tool schemas via Roslyn incremental generator (ADR-001). +- [ ] ~~Web search tool~~ — deferred, not needed for minimal viable concept. +- [ ] ~~Web fetch tool~~ — deferred, not needed for minimal viable concept. + +> **Note:** GitHub CLI access is handled via `shell_execute` + `gh` — no dedicated tool needed. +> Web search and web fetch deferred — shell + file tools are sufficient to prove the concept. ### Task 1.10: Full provider abstraction with MEAI and fallback diff --git a/Netclaw.slnx b/Netclaw.slnx index f498365d3..d5121f044 100644 --- a/Netclaw.slnx +++ b/Netclaw.slnx @@ -9,6 +9,7 @@ + diff --git a/docs/research/agent-gateway-architecture.md b/docs/research/agent-gateway-architecture.md new file mode 100644 index 000000000..19d6b8b71 --- /dev/null +++ b/docs/research/agent-gateway-architecture.md @@ -0,0 +1,126 @@ +# Agent Gateway Architecture Research + +**Date:** 2026-02-22 +**Context:** Architectural research before designing Netclaw's channel abstraction + +## Projects Studied + +| Project | Language | Stars | Architecture | +|---------|----------|-------|-------------| +| OpenClaw | TypeScript | 218k+ | Gateway + Agent in one Node.js process | +| IronClaw | Rust | 3k | Hub-and-spoke, Axum gateway + Agent loop as Tokio tasks | +| ZeroClaw | Rust | 17k | daemon command runs gateway + channels + agent + scheduler as Tokio tasks | + +## Consensus: Single Process, Logical Separation + +All three are single-process architectures. None split gateway and agent into +separate processes for normal operation. + +### Gateway vs Agent Boundary + +All three have a clean logical boundary but no process boundary: + +- **OpenClaw**: Gateway is a WebSocket control plane (`ws://127.0.0.1:18789`), + Agent is an embedded "peer" in the same process. External agents can connect + via ACP (stdin/stdout NDJSON protocol), but the default agent is in-process. +- **IronClaw**: Gateway is an Axum HTTP server implementing the `Channel` trait. + Agent loop processes messages from all channels via a merged async stream. + Docker containers are the only separate processes (for sandboxed tool + execution). +- **ZeroClaw**: Same pattern — gateway, channels, agent, scheduler all + supervised as Tokio tasks in one process. Optimized for $10 hardware + (Raspberry Pi Zero). + +### Security Perimeter + +All three put the security boundary at the gateway/channel layer, not between +gateway and agent: + +- **OpenClaw**: DM pairing (default-deny, approve unknown senders), role-based + access (operator/node), tool policy (glob allow/deny), exec approval gates, + optional Docker sandbox. +- **IronClaw**: Device pairing (one-time code → bearer token), WASM sandbox for + tools, Docker sandbox for shell, safety layer (prompt injection defense, secret + leak scanning), tool domain separation (Orchestrator vs Container). +- **ZeroClaw**: Device pairing, three autonomy levels (ReadOnly/Supervised/Full), + command allowlists, forbidden path blocklists, rate limits, approval workflows, + env sanitization on every shell exec. + +### CLI/TUI as a Channel + +In all three, the CLI/TUI is just another channel — it implements the same +interface as Slack, Discord, etc. and feeds messages into the same pipeline. + +- OpenClaw: `openclaw tui` connects to the Gateway WebSocket +- IronClaw: `CliChannel` implements the `Channel` trait, feeds `ChannelMessage` + into same `mpsc::Sender` as all channels +- ZeroClaw: `CliChannel` feeds into shared `mpsc::Sender` + +### Tool Execution: Layered Security + +All three use layered tool security: + +1. **Policy/allowlist filtering** — which tools the agent can see +2. **Runtime approval gates** — user confirms before dangerous operations +3. **Sandbox isolation** — Docker/WASM for shell execution +4. **Output sanitization** — secret scrubbing before feeding back to LLM + +### Channel Trait Patterns + +**IronClaw** (`Channel` trait in Rust): +``` +- name() -> &str +- send(message) -> Result +- listen(tx: mpsc::Sender) -> Result +- health_check() -> bool +- start_typing(recipient) / stop_typing(recipient) +- supports_draft_updates() -> bool +- send_draft() / update_draft() / finalize_draft() +- add_reaction() +``` + +**ZeroClaw** (`Channel` trait in Rust): +``` +- Same push-based pattern: listen() receives an mpsc::Sender +- All channels feed ChannelMessage structs into a shared sender +- daemon supervises all configured channels as concurrent Tokio tasks +``` + +**OpenClaw** (Node.js channel plugins): +``` +- Each channel is a module that normalizes to common message format +- Session identity: {channel}:{kind}:{id} +- Multi-agent routing: different channels can route to different workspaces +``` + +### Message Identity Patterns + +| Project | Session Key Format | +|---------|-------------------| +| OpenClaw | `{channel}:{kind}:{id}` (e.g., `slack:dm:U12345`) | +| IronClaw | `thread_ts: Option` on IncomingMessage | +| ZeroClaw | `thread_ts: Option` on ChannelMessage | +| Netclaw (current) | `{channelId}/{threadTs}` | + +### Multi-Process Extensions (Not Default) + +While all default to single-process, each has escape hatches: + +- **OpenClaw**: ACP bridge (stdin/stdout NDJSON) for external agent processes; + Nodes (macOS/iOS/Android companion apps) connect as WebSocket clients +- **IronClaw**: Docker containers for sandboxed tool execution; worker mode + (`ironclaw worker`) for container instances that connect back to orchestrator + via HTTP; Claude Code bridge mode for delegated coding +- **ZeroClaw**: Docker runtime for tool execution isolation; WASM runtime module + for serverless/edge deployment + +## Key Decisions for Netclaw + +1. **Stay single-process** — all three validate this for homelab/personal use +2. **TUI goes through the same pipeline as Slack** — just another channel +3. **Security boundary at the channel layer** — pairing/auth before messages + enter, tool policy inside +4. **Channel abstraction is the key interface** — Slack, TUI, HTTP, timers all + implement the same contract +5. **Layered tool security** — policy filtering → approval gates → sandbox +6. **Session identity from the channel** — channel provides the entity key diff --git a/src/Netclaw.Actors/Channels/ChannelInput.cs b/src/Netclaw.Actors/Channels/ChannelInput.cs new file mode 100644 index 000000000..c5cdd1ac7 --- /dev/null +++ b/src/Netclaw.Actors/Channels/ChannelInput.cs @@ -0,0 +1,32 @@ +using Microsoft.Extensions.AI; + +namespace Netclaw.Actors.Channels; + +/// +/// Strongly-typed inbound message for the session stream API. +/// Supports multi-modal content via from +/// Microsoft.Extensions.AI.Abstractions. +/// +public sealed record ChannelInput +{ + /// + /// Identity of the user who sent this message. + /// + public required string SenderId { get; init; } + + /// + /// Optional message ID for correlation and deduplication. + /// + public string? MessageId { get; init; } + + /// + /// Message content. Supports text (), + /// images, and other modalities via the hierarchy. + /// + public required IReadOnlyList Contents { get; init; } + + /// + /// When the message was received by the channel. + /// + public DateTimeOffset ReceivedAt { get; init; } +} diff --git a/src/Netclaw.Actors/Channels/ChannelPipeline.cs b/src/Netclaw.Actors/Channels/ChannelPipeline.cs new file mode 100644 index 000000000..dc03a65d9 --- /dev/null +++ b/src/Netclaw.Actors/Channels/ChannelPipeline.cs @@ -0,0 +1,167 @@ +using Akka; +using Akka.Actor; +using Akka.Hosting; +using Akka.Streams; +using Akka.Streams.Dsl; +using Microsoft.Extensions.AI; +using Netclaw.Actors.Hosting; +using Netclaw.Actors.Protocol; + +namespace Netclaw.Actors.Channels; + +/// +/// Options for creating a session pipeline. +/// +public sealed record SessionPipelineOptions +{ + /// + /// Channel type identifier (e.g. "console", "headless", "slack"). + /// Used to populate on inbound messages. + /// + public required string ChannelType { get; init; } + + /// + /// Which output categories the channel wants to receive. + /// + public OutputFilter Filter { get; init; } = OutputFilter.Full; +} + +/// +/// Handle to a materialized session. Exposes typed Akka.Streams for +/// bidirectional communication with an LLM session actor — all actor +/// internals (JoinSession, subscriber refs, message routing) are hidden. +/// +public sealed class MaterializedSession : IAsyncDisposable +{ + private readonly SharedKillSwitch _killSwitch; + + internal MaterializedSession( + Sink input, + Source output, + SharedKillSwitch killSwitch) + { + Input = input; + Output = output; + _killSwitch = killSwitch; + } + + /// + /// Input sink. Encapsulates → + /// transformation and delivery to the + /// session manager. Channel connects its own Source: + /// + /// Source.Queue<ChannelInput>(16, Backpressure) + /// .ToMat(session.Input, Keep.Left) + /// .Run(system); + /// + /// + public Sink Input { get; } + + /// + /// Output stream backed by a pre-materialized subscriber actor. + /// Channel connects its own Sink: + /// + /// session.Output + /// .To(Sink.ForEach<SessionOutput>(Render)) + /// .Run(system); + /// + /// + public Source Output { get; } + + /// + /// Gracefully shuts down both inbound and outbound streams + /// via the shared kill switch. + /// + public ValueTask DisposeAsync() + { + _killSwitch.Shutdown(); + return ValueTask.CompletedTask; + } +} + +/// +/// Factory for creating per-session Akka.Streams pipelines. Injected via DI. +/// Channels call to get a +/// without touching actor system internals. +/// +/// +/// Internally wires a subscriber actor (via Source.PreMaterialize) +/// and a command sink (via Sink.ActorRef) to the session manager, +/// with a shared for coordinated teardown. +/// +/// +public sealed class SessionPipeline +{ + private readonly ActorSystem _system; + private readonly IRequiredActor _sessionManagerProvider; + + public SessionPipeline( + ActorSystem system, + IRequiredActor sessionManagerProvider) + { + _system = system; + _sessionManagerProvider = sessionManagerProvider; + } + + /// + /// Creates a materialized session with typed input/output streams. + /// + /// Session identity (channel owns the naming scheme). + /// Pipeline configuration (channel type, output filter). + /// Cancellation token for session manager resolution. + /// A session handle with and + /// streams. + public async Task CreateAsync( + SessionId sessionId, + SessionPipelineOptions options, + CancellationToken cancellationToken = default) + { + var sessionManager = await _sessionManagerProvider.GetAsync(cancellationToken); + var killSwitch = KillSwitches.Shared($"session-{sessionId.Value}"); + + // Pre-materialize subscriber to capture IActorRef before building streams + var (subscriber, responseSource) = Source.ActorRef(256, OverflowStrategy.DropHead) + .PreMaterialize(_system); + + // Inbound: ChannelInput → SendUserMessage → session manager + var inputSink = Flow.Create() + .Select(input => MapToCommand(input, sessionId, options)) + .Via(killSwitch.Flow()) + .To(Sink.ActorRef(sessionManager, Done.Instance, + ex => new Status.Failure(ex))); + + // Outbound: pre-materialized subscriber → kill switch → exposed Source + var outputSource = responseSource + .Via(killSwitch.Flow()); + + // Join the session — subscriber starts receiving output + sessionManager.Tell(new JoinSession + { + SessionId = sessionId, + Subscriber = subscriber, + Filter = options.Filter + }); + + return new MaterializedSession(inputSink, outputSource, killSwitch); + } + + private SendUserMessage MapToCommand( + ChannelInput input, SessionId sessionId, SessionPipelineOptions options) + { + // Extract text content from AIContent list (multi-modal future enhancement) + var textParts = input.Contents.OfType().Select(t => t.Text); + var content = string.Join("\n", textParts); + + return new SendUserMessage + { + SessionId = sessionId, + Content = content, + Source = new MessageSource + { + ChannelType = options.ChannelType, + SenderId = input.SenderId, + ReceivedAt = input.ReceivedAt + } + }; + } +} diff --git a/src/Netclaw.Actors/Channels/MessageSource.cs b/src/Netclaw.Actors/Channels/MessageSource.cs new file mode 100644 index 000000000..38e950c59 --- /dev/null +++ b/src/Netclaw.Actors/Channels/MessageSource.cs @@ -0,0 +1,28 @@ +namespace Netclaw.Actors.Channels; + +/// +/// Ephemeral metadata describing where a user message originated. +/// Used for ACL checks and audit logging — NOT persisted with the session. +/// +public sealed record MessageSource +{ + /// + /// Channel type identifier (e.g. "console", "headless", "slack"). + /// + public required string ChannelType { get; init; } + + /// + /// Identity of the sender within the channel (e.g. Slack user ID, "local-user"). + /// + public required string SenderId { get; init; } + + /// + /// Optional channel-specific identifier (e.g. Slack channel ID). + /// + public string? ChannelId { get; init; } + + /// + /// When the message was received by the channel. + /// + public DateTimeOffset ReceivedAt { get; init; } +} diff --git a/src/Netclaw.Actors/Netclaw.Actors.csproj b/src/Netclaw.Actors/Netclaw.Actors.csproj index 1ccb560f8..3c73a0a67 100644 --- a/src/Netclaw.Actors/Netclaw.Actors.csproj +++ b/src/Netclaw.Actors/Netclaw.Actors.csproj @@ -20,9 +20,7 @@ - + diff --git a/src/Netclaw.Actors/Protocol/Commands.cs b/src/Netclaw.Actors/Protocol/Commands.cs index fb51cbc83..c1172a314 100644 --- a/src/Netclaw.Actors/Protocol/Commands.cs +++ b/src/Netclaw.Actors/Protocol/Commands.cs @@ -1,3 +1,4 @@ +using Netclaw.Actors.Channels; using ProtoBuf; namespace Netclaw.Actors.Protocol; @@ -13,4 +14,10 @@ public sealed class SendUserMessage : IWithSessionId [ProtoMember(2)] public string Content { get; set; } = string.Empty; + + /// + /// Ephemeral channel metadata for ACL/audit. Not persisted. + /// + [ProtoIgnore] + public MessageSource? Source { get; set; } } diff --git a/src/Netclaw.Actors/Protocol/SessionSubscription.cs b/src/Netclaw.Actors/Protocol/SessionSubscription.cs index 2fbfc3412..3298319bd 100644 --- a/src/Netclaw.Actors/Protocol/SessionSubscription.cs +++ b/src/Netclaw.Actors/Protocol/SessionSubscription.cs @@ -72,11 +72,10 @@ public sealed record LeaveSession : IWithSessionId /// /// Acknowledgement sent to the subscriber after successfully joining a session. /// Provides current session state for catch-up. +/// Lifecycle — always delivered regardless of . /// -public sealed record SessionJoined +public sealed record SessionJoined : SessionOutput { - public required SessionId SessionId { get; init; } - /// /// Human-readable session title, if one has been generated. /// May be null for brand-new sessions. diff --git a/src/Netclaw.App/ConsoleAdapter.cs b/src/Netclaw.App/ConsoleAdapter.cs deleted file mode 100644 index 49a5cbc2b..000000000 --- a/src/Netclaw.App/ConsoleAdapter.cs +++ /dev/null @@ -1,219 +0,0 @@ -using Akka.Actor; -using Akka.Hosting; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Netclaw.Actors.Configuration; -using Netclaw.Actors.Hosting; -using Netclaw.Actors.Protocol; - -/// -/// Hosted service that provides a bare console chat loop for proving -/// the actor system works end-to-end with a real LLM. -/// -/// Creates a subscriber actor to receive session outputs and a while loop -/// that reads user input from stdin and sends it to the session actor. -/// -/// All session activity is logged to ~/.netclaw/logs/{sessionId}.log. -/// Console output is reserved exclusively for the chat UI. -/// -/// This is a temporary proof-of-concept adapter. It will be replaced by -/// a proper CLI framework (Cocona) and TUI (Termina) in later tasks. -/// -public sealed class ConsoleAdapter : IHostedService -{ - private readonly IRequiredActor _sessionManagerProvider; - private readonly ActorSystem _actorSystem; - private readonly NetclawPaths _paths; - private readonly IHostApplicationLifetime _lifetime; - private readonly ILogger _logger; - - private CancellationTokenRegistration _shutdownRegistration; - - public ConsoleAdapter( - IRequiredActor sessionManagerProvider, - ActorSystem actorSystem, - NetclawPaths paths, - IHostApplicationLifetime lifetime, - ILogger logger) - { - _sessionManagerProvider = sessionManagerProvider; - _actorSystem = actorSystem; - _paths = paths; - _lifetime = lifetime; - _logger = logger; - } - - public Task StartAsync(CancellationToken cancellationToken) - { - // Run the chat loop on a background thread so we don't block host startup - _shutdownRegistration = _lifetime.ApplicationStarted.Register(() => - { - _ = Task.Run(() => RunChatLoopAsync(_lifetime.ApplicationStopping), CancellationToken.None); - }); - - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - _shutdownRegistration.Dispose(); - return Task.CompletedTask; - } - - private async Task RunChatLoopAsync(CancellationToken stopping) - { - try - { - var sessionManager = await _sessionManagerProvider.GetAsync(stopping); - var sessionId = new SessionId($"tui/{Guid.NewGuid():N}"); - - // Set up session log file - _paths.EnsureDirectoriesExist(); - var logFileName = $"{sessionId.Value.Replace("/", "-")}.log"; - var logPath = Path.Combine(_paths.LogsDirectory, logFileName); - var logWriter = new StreamWriter(logPath, append: false) { AutoFlush = true }; - - logWriter.WriteLine($"[{DateTimeOffset.UtcNow:o}] Session started: {sessionId}"); - - // Create subscriber actor that writes session output to console + log - var subscriber = _actorSystem.ActorOf( - Props.Create(() => new ConsoleSubscriberActor(logWriter)), - $"console-subscriber-{sessionId.Value.Replace("/", "-")}"); - - // Join the session - sessionManager.Tell(new JoinSession - { - SessionId = sessionId, - Subscriber = subscriber, - Filter = OutputFilter.Full - }); - - _logger.LogInformation("Session started: {SessionId} (log: {LogPath})", sessionId, logPath); - Console.WriteLine(); - Console.WriteLine($"Netclaw console chat (log: {logPath})"); - Console.WriteLine("Type 'exit' to quit."); - Console.WriteLine("──────────────────────────────────────────"); - Console.WriteLine(); - - while (!stopping.IsCancellationRequested) - { - Console.Write("You> "); - var input = Console.ReadLine(); - - if (input is null || string.Equals(input.Trim(), "exit", StringComparison.OrdinalIgnoreCase)) - { - logWriter.WriteLine($"[{DateTimeOffset.UtcNow:o}] User exited chat"); - _lifetime.StopApplication(); - break; - } - - if (string.IsNullOrWhiteSpace(input)) - continue; - - logWriter.WriteLine($"[{DateTimeOffset.UtcNow:o}] USER: {input}"); - - sessionManager.Tell(new SendUserMessage - { - SessionId = sessionId, - Content = input - }); - } - } - catch (OperationCanceledException ex) - { - _logger.LogDebug(ex, "Console chat loop cancelled (shutdown)"); - } - catch (Exception ex) - { - _logger.LogError(ex, "Console chat loop failed"); - _lifetime.StopApplication(); - } - } -} - -/// -/// Minimal actor that receives session outputs and writes them to the console -/// and a per-session log file. Needed because session outputs are delivered via -/// Akka Tell to an IActorRef subscriber. -/// -public sealed class ConsoleSubscriberActor : ReceiveActor -{ - private readonly StreamWriter _log; - - public ConsoleSubscriberActor(StreamWriter logWriter) - { - _log = logWriter; - - Receive(msg => - { - Log($"SESSION_JOINED turn_count={msg.TurnCount} title={msg.Title ?? "(none)"}"); - }); - - Receive(msg => - { - Console.WriteLine(); - Console.WriteLine($"Netclaw> {msg.Text}"); - Console.WriteLine(); - Log($"ASSISTANT: {msg.Text}"); - }); - - Receive(msg => - { - Console.ForegroundColor = ConsoleColor.DarkGray; - Console.WriteLine($" [thinking] {msg.Text}"); - Console.ResetColor(); - Log($"THINKING: {msg.Text}"); - }); - - Receive(msg => - { - Console.ForegroundColor = ConsoleColor.Cyan; - Console.WriteLine($" [tool] {msg.ToolName}({msg.ArgumentsJson ?? ""})"); - Console.ResetColor(); - Log($"TOOL_CALL: {msg.ToolName} call_id={msg.CallId} args={msg.ArgumentsJson ?? "{}"}"); - }); - - Receive(msg => - { - var usage = msg.UsagePercent.HasValue - ? $" ({msg.UsagePercent.Value:P0} context)" - : ""; - Log($"USAGE: in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens} cached={msg.CachedInputTokens} reasoning={msg.ReasoningTokens} context_window={msg.ContextWindowTokens}{usage}"); - }); - - Receive(msg => - { - Console.ForegroundColor = ConsoleColor.Red; - Console.WriteLine($" [error] {msg.Message}"); - Console.ResetColor(); - Log($"ERROR: {msg.Message}"); - if (msg.Cause is not null) - Log($"EXCEPTION: {msg.Cause}"); - }); - - Receive(msg => - { - Log($"TURN_COMPLETED: turn={msg.TurnNumber}"); - }); - - Receive(msg => - { - Console.ForegroundColor = ConsoleColor.Yellow; - Console.WriteLine($" [compaction] {msg.MessagesBefore} → {msg.MessagesAfter} messages"); - Console.ResetColor(); - Log($"COMPACTION: before={msg.MessagesBefore} after={msg.MessagesAfter} tool_results_cleared={msg.ToolResultsCleared} summarized={msg.Summarized}"); - }); - } - - private void Log(string message) - { - _log.WriteLine($"[{DateTimeOffset.UtcNow:o}] {message}"); - } - - protected override void PostStop() - { - Log("SESSION_ENDED"); - _log.Dispose(); - base.PostStop(); - } -} diff --git a/src/Netclaw.App/ConsoleChannel.cs b/src/Netclaw.App/ConsoleChannel.cs new file mode 100644 index 000000000..342cd93ad --- /dev/null +++ b/src/Netclaw.App/ConsoleChannel.cs @@ -0,0 +1,215 @@ +using Akka.Actor; +using Akka.Streams; +using Akka.Streams.Dsl; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Netclaw.Actors.Channels; +using Netclaw.Actors.Configuration; +using Netclaw.Actors.Protocol; +using Netclaw.Channels; + +namespace Netclaw.App; + +/// +/// Interactive console channel. Reads user input from stdin and renders +/// session output to stdout with color formatting. +/// +/// Creates a single session for the channel's lifetime using a +/// for stream-based communication. +/// +/// All session activity is logged to ~/.netclaw/logs/{sessionId}.log. +/// Console output is reserved exclusively for the chat UI. +/// +public sealed class ConsoleChannel : IChannel +{ + private readonly SessionPipeline _pipeline; + private readonly ActorSystem _system; + private readonly NetclawPaths _paths; + private readonly IHostApplicationLifetime _lifetime; + private readonly TimeProvider _timeProvider; + private readonly ILogger _logger; + + private CancellationTokenRegistration _shutdownRegistration; + private MaterializedSession? _session; + + public string ChannelType => "console"; + public string DisplayName => "Console Chat"; + + public ChannelHealth GetHealth() => _session is not null + ? new ChannelHealth(ChannelHealthStatus.Healthy) + : new ChannelHealth(ChannelHealthStatus.Disconnected, "No active session"); + + public ConsoleChannel( + SessionPipeline pipeline, + ActorSystem system, + NetclawPaths paths, + IHostApplicationLifetime lifetime, + TimeProvider timeProvider, + ILogger logger) + { + _pipeline = pipeline; + _system = system; + _paths = paths; + _lifetime = lifetime; + _timeProvider = timeProvider; + _logger = logger; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _shutdownRegistration = _lifetime.ApplicationStarted.Register(() => + { + _ = Task.Run(() => RunChatLoopAsync(_lifetime.ApplicationStopping), CancellationToken.None); + }); + + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_session is not null) + await _session.DisposeAsync(); + _shutdownRegistration.Dispose(); + } + + private async Task RunChatLoopAsync(CancellationToken stopping) + { + try + { + var sessionId = new SessionId($"console/{Guid.NewGuid():N}"); + + // Set up session log file + _paths.EnsureDirectoriesExist(); + var logFileName = $"{sessionId.Value.Replace("/", "-")}.log"; + var logPath = Path.Combine(_paths.LogsDirectory, logFileName); + var logWriter = new StreamWriter(logPath, append: false) { AutoFlush = true }; + + logWriter.WriteLine($"[{_timeProvider.GetUtcNow():o}] Session started: {sessionId}"); + + // Create session pipeline + _session = await _pipeline.CreateAsync(sessionId, new SessionPipelineOptions + { + ChannelType = ChannelType + }, stopping); + + // Materialize output stream → console rendering + disk logging + _session.Output + .To(Sink.ForEach(output => RenderOutput(output, logWriter))) + .Run(_system); + + // Materialize input with queue for imperative push from readline + var inputQueue = Source.Queue(16, OverflowStrategy.Backpressure) + .ToMaterialized(_session.Input, Keep.Left) + .Run(_system); + + _logger.LogInformation("Session started: {SessionId} (log: {LogPath})", sessionId, logPath); + Console.WriteLine(); + Console.WriteLine($"Netclaw console chat (log: {logPath})"); + Console.WriteLine("Type 'exit' to quit."); + Console.WriteLine("──────────────────────────────────────────"); + Console.WriteLine(); + + while (!stopping.IsCancellationRequested) + { + Console.Write("You> "); + var input = Console.ReadLine(); + + if (input is null || string.Equals(input.Trim(), "exit", StringComparison.OrdinalIgnoreCase)) + { + logWriter.WriteLine($"[{_timeProvider.GetUtcNow():o}] User exited chat"); + _lifetime.StopApplication(); + break; + } + + if (string.IsNullOrWhiteSpace(input)) + continue; + + logWriter.WriteLine($"[{_timeProvider.GetUtcNow():o}] USER: {input}"); + + await inputQueue.OfferAsync(new ChannelInput + { + SenderId = "local-user", + Contents = [new TextContent(input)], + ReceivedAt = _timeProvider.GetUtcNow() + }); + } + } + catch (OperationCanceledException ex) + { + _logger.LogDebug(ex, "Console chat loop cancelled (shutdown)"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Console chat loop failed"); + _lifetime.StopApplication(); + } + } + + private static void RenderOutput(SessionOutput output, StreamWriter log) + { + switch (output) + { + case SessionJoined msg: + Log(log, $"SESSION_JOINED turn_count={msg.TurnCount} title={msg.Title ?? "(none)"}"); + break; + + case TextOutput msg: + Console.WriteLine(); + Console.WriteLine($"Netclaw> {msg.Text}"); + Console.WriteLine(); + Log(log, $"ASSISTANT: {msg.Text}"); + break; + + case ThinkingOutput msg: + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($" [thinking] {msg.Text}"); + Console.ResetColor(); + Log(log, $"THINKING: {msg.Text}"); + break; + + case ToolCallOutput msg: + Console.ForegroundColor = ConsoleColor.Cyan; + Console.WriteLine($" [tool] {msg.ToolName}({msg.ArgumentsJson ?? ""})"); + Console.ResetColor(); + Log(log, $"TOOL_CALL: {msg.ToolName} call_id={msg.CallId} args={msg.ArgumentsJson ?? "{}"}"); + break; + + case ToolResultOutput msg: + Log(log, $"TOOL_RESULT: {msg.ToolName} call_id={msg.CallId} result={msg.Result}"); + break; + + case UsageOutput msg: + var usage = msg.UsagePercent.HasValue + ? $" ({msg.UsagePercent.Value:P0} context)" + : ""; + Log(log, $"USAGE: in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens} cached={msg.CachedInputTokens} reasoning={msg.ReasoningTokens} context_window={msg.ContextWindowTokens}{usage}"); + break; + + case ErrorOutput msg: + Console.ForegroundColor = ConsoleColor.Red; + Console.WriteLine($" [error] {msg.Message}"); + Console.ResetColor(); + Log(log, $"ERROR: {msg.Message}"); + if (msg.Cause is not null) + Log(log, $"EXCEPTION: {msg.Cause}"); + break; + + case TurnCompleted msg: + Log(log, $"TURN_COMPLETED: turn={msg.TurnNumber}"); + break; + + case CompactionOutput msg: + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine($" [compaction] {msg.MessagesBefore} → {msg.MessagesAfter} messages"); + Console.ResetColor(); + Log(log, $"COMPACTION: before={msg.MessagesBefore} after={msg.MessagesAfter} tool_results_cleared={msg.ToolResultsCleared} summarized={msg.Summarized}"); + break; + } + } + + private static void Log(StreamWriter log, string message) + { + log.WriteLine($"[{DateTimeOffset.UtcNow:o}] {message}"); + } +} diff --git a/src/Netclaw.App/HeadlessAdapter.cs b/src/Netclaw.App/HeadlessAdapter.cs deleted file mode 100644 index b92bdd06c..000000000 --- a/src/Netclaw.App/HeadlessAdapter.cs +++ /dev/null @@ -1,197 +0,0 @@ -using Akka.Actor; -using Akka.Hosting; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Netclaw.Actors.Configuration; -using Netclaw.Actors.Hosting; -using Netclaw.Actors.Protocol; -using Netclaw.App; - -/// -/// Hosted service that sends a single prompt to the LLM session and streams -/// all output (tool calls, results, text, usage) to stdout, then exits. -/// -/// This is the -p / --prompt headless mode — same UX concept -/// as claude -p. Useful for smoke-testing tool discovery and invocation -/// against different models without a human in the loop. -/// -public sealed class HeadlessAdapter : IHostedService -{ - private readonly IRequiredActor _sessionManagerProvider; - private readonly ActorSystem _actorSystem; - private readonly NetclawPaths _paths; - private readonly IHostApplicationLifetime _lifetime; - private readonly HeadlessOptions _options; - private readonly ILogger _logger; - - private CancellationTokenRegistration _shutdownRegistration; - - public HeadlessAdapter( - IRequiredActor sessionManagerProvider, - ActorSystem actorSystem, - NetclawPaths paths, - IHostApplicationLifetime lifetime, - HeadlessOptions options, - ILogger logger) - { - _sessionManagerProvider = sessionManagerProvider; - _actorSystem = actorSystem; - _paths = paths; - _lifetime = lifetime; - _options = options; - _logger = logger; - } - - public Task StartAsync(CancellationToken cancellationToken) - { - _shutdownRegistration = _lifetime.ApplicationStarted.Register(() => - { - _ = Task.Run(() => RunHeadlessAsync(_lifetime.ApplicationStopping), CancellationToken.None); - }); - - return Task.CompletedTask; - } - - public Task StopAsync(CancellationToken cancellationToken) - { - _shutdownRegistration.Dispose(); - return Task.CompletedTask; - } - - private async Task RunHeadlessAsync(CancellationToken stopping) - { - try - { - var sessionManager = await _sessionManagerProvider.GetAsync(stopping); - var sessionId = new SessionId($"headless/{Guid.NewGuid():N}"); - - // Set up session log file - _paths.EnsureDirectoriesExist(); - var logFileName = $"{sessionId.Value.Replace("/", "-")}.log"; - var logPath = Path.Combine(_paths.LogsDirectory, logFileName); - var logWriter = new StreamWriter(logPath, append: false) { AutoFlush = true }; - - logWriter.WriteLine($"[{DateTimeOffset.UtcNow:o}] Headless session started: {sessionId}"); - logWriter.WriteLine($"[{DateTimeOffset.UtcNow:o}] PROMPT: {_options.Prompt}"); - - // Create subscriber actor that writes session output to stdout + log - var subscriber = _actorSystem.ActorOf( - Props.Create(() => new HeadlessSubscriberActor(logWriter, _lifetime)), - $"headless-subscriber-{sessionId.Value.Replace("/", "-")}"); - - // Join the session with full output (tool calls, thinking, usage) - sessionManager.Tell(new JoinSession - { - SessionId = sessionId, - Subscriber = subscriber, - Filter = OutputFilter.Full - }); - - // Send the single prompt - sessionManager.Tell(new SendUserMessage - { - SessionId = sessionId, - Content = _options.Prompt - }); - - _logger.LogInformation("Headless session started: {SessionId} (log: {LogPath})", sessionId, logPath); - } - catch (OperationCanceledException ex) - { - _logger.LogDebug(ex, "Headless adapter cancelled (shutdown)"); - } - catch (Exception ex) - { - _logger.LogError(ex, "Headless adapter failed"); - _lifetime.StopApplication(); - } - } -} - -/// -/// Subscriber actor for headless mode. Streams all session output to stdout -/// in a human-readable format and shuts down the application on . -/// -public sealed class HeadlessSubscriberActor : ReceiveActor -{ - private readonly StreamWriter _log; - private readonly IHostApplicationLifetime _lifetime; - - public HeadlessSubscriberActor(StreamWriter logWriter, IHostApplicationLifetime lifetime) - { - _log = logWriter; - _lifetime = lifetime; - - Receive(msg => - { - Log($"SESSION_JOINED turn_count={msg.TurnCount} title={msg.Title ?? "(none)"}"); - }); - - Receive(msg => - { - Console.WriteLine(msg.Text); - Log($"ASSISTANT: {msg.Text}"); - }); - - Receive(msg => - { - Console.WriteLine($"[thinking] {msg.Text}"); - Log($"THINKING: {msg.Text}"); - }); - - Receive(msg => - { - Console.WriteLine($"[tool:call] {msg.ToolName}({msg.ArgumentsJson ?? ""})"); - Log($"TOOL_CALL: {msg.ToolName} call_id={msg.CallId} args={msg.ArgumentsJson ?? "{}"}"); - }); - - Receive(msg => - { - Console.WriteLine($"[tool:result] {msg.ToolName} \u2192 {msg.Result}"); - Log($"TOOL_RESULT: {msg.ToolName} call_id={msg.CallId} result={msg.Result}"); - }); - - Receive(msg => - { - Console.WriteLine($"[usage] in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens}"); - Log($"USAGE: in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens} cached={msg.CachedInputTokens} reasoning={msg.ReasoningTokens} context_window={msg.ContextWindowTokens}"); - }); - - Receive(msg => - { - Console.Error.WriteLine($"[error] {msg.Message}"); - Log($"ERROR: {msg.Message}"); - if (msg.Cause is not null) - Log($"EXCEPTION: {msg.Cause}"); - }); - - Receive(msg => - { - Log($"TURN_COMPLETED: turn={msg.TurnNumber}"); - _lifetime.StopApplication(); - }); - - Receive(msg => - { - Console.WriteLine($"[compaction] {msg.MessagesBefore} \u2192 {msg.MessagesAfter} messages"); - Log($"COMPACTION: before={msg.MessagesBefore} after={msg.MessagesAfter} tool_results_cleared={msg.ToolResultsCleared} summarized={msg.Summarized}"); - }); - - Receive(_ => - { - // Ignored in headless mode — no UI to update - }); - } - - private void Log(string message) - { - _log.WriteLine($"[{DateTimeOffset.UtcNow:o}] {message}"); - } - - protected override void PostStop() - { - Log("SESSION_ENDED"); - _log.Dispose(); - base.PostStop(); - } -} diff --git a/src/Netclaw.App/HeadlessChannel.cs b/src/Netclaw.App/HeadlessChannel.cs new file mode 100644 index 000000000..9f356ebb5 --- /dev/null +++ b/src/Netclaw.App/HeadlessChannel.cs @@ -0,0 +1,176 @@ +using Akka.Actor; +using Akka.Streams; +using Akka.Streams.Dsl; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Netclaw.Actors.Channels; +using Netclaw.Actors.Configuration; +using Netclaw.Actors.Protocol; +using Netclaw.Channels; + +namespace Netclaw.App; + +/// +/// Headless channel for single-prompt mode (-p / --prompt). +/// Sends one message to the LLM session, streams all output to stdout, +/// and exits on . +/// +public sealed class HeadlessChannel : IChannel +{ + private readonly SessionPipeline _pipeline; + private readonly ActorSystem _system; + private readonly NetclawPaths _paths; + private readonly IHostApplicationLifetime _lifetime; + private readonly TimeProvider _timeProvider; + private readonly string _prompt; + private readonly ILogger _logger; + + private MaterializedSession? _session; + + public string ChannelType => "headless"; + public string DisplayName => "Headless Prompt"; + + public ChannelHealth GetHealth() => _session is not null + ? new ChannelHealth(ChannelHealthStatus.Healthy) + : new ChannelHealth(ChannelHealthStatus.Disconnected, "No active session"); + + public HeadlessChannel( + SessionPipeline pipeline, + ActorSystem system, + NetclawPaths paths, + IHostApplicationLifetime lifetime, + TimeProvider timeProvider, + string prompt, + ILogger logger) + { + _pipeline = pipeline; + _system = system; + _paths = paths; + _lifetime = lifetime; + _timeProvider = timeProvider; + _prompt = prompt; + _logger = logger; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + _ = Task.Run(() => RunHeadlessAsync(_lifetime.ApplicationStopping), CancellationToken.None); + return Task.CompletedTask; + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + if (_session is not null) + await _session.DisposeAsync(); + } + + private async Task RunHeadlessAsync(CancellationToken stopping) + { + try + { + var sessionId = new SessionId($"headless/{Guid.NewGuid():N}"); + + // Set up session log file + _paths.EnsureDirectoriesExist(); + var logFileName = $"{sessionId.Value.Replace("/", "-")}.log"; + var logPath = Path.Combine(_paths.LogsDirectory, logFileName); + var logWriter = new StreamWriter(logPath, append: false) { AutoFlush = true }; + + logWriter.WriteLine($"[{_timeProvider.GetUtcNow():o}] Headless session started: {sessionId}"); + logWriter.WriteLine($"[{_timeProvider.GetUtcNow():o}] PROMPT: {_prompt}"); + + // Create session pipeline + _session = await _pipeline.CreateAsync(sessionId, new SessionPipelineOptions + { + ChannelType = ChannelType + }, stopping); + + // Materialize output stream → console + disk logging, exit on TurnCompleted + _session.Output + .To(Sink.ForEach(output => HandleOutput(output, logWriter))) + .Run(_system); + + // Materialize input with queue and send the single prompt + var inputQueue = Source.Queue(16, OverflowStrategy.Backpressure) + .ToMaterialized(_session.Input, Keep.Left) + .Run(_system); + + await inputQueue.OfferAsync(new ChannelInput + { + SenderId = "local-user", + Contents = [new TextContent(_prompt)], + ReceivedAt = _timeProvider.GetUtcNow() + }); + + _logger.LogInformation("Headless session started: {SessionId} (log: {LogPath})", sessionId, logPath); + } + catch (OperationCanceledException ex) + { + _logger.LogDebug(ex, "Headless channel cancelled (shutdown)"); + } + catch (Exception ex) + { + _logger.LogError(ex, "Headless channel failed"); + _lifetime.StopApplication(); + } + } + + private void HandleOutput(SessionOutput output, StreamWriter log) + { + switch (output) + { + case SessionJoined msg: + Log(log, $"SESSION_JOINED turn_count={msg.TurnCount} title={msg.Title ?? "(none)"}"); + break; + + case TextOutput msg: + Console.WriteLine(msg.Text); + Log(log, $"ASSISTANT: {msg.Text}"); + break; + + case ThinkingOutput msg: + Console.WriteLine($"[thinking] {msg.Text}"); + Log(log, $"THINKING: {msg.Text}"); + break; + + case ToolCallOutput msg: + Console.WriteLine($"[tool:call] {msg.ToolName}({msg.ArgumentsJson ?? ""})"); + Log(log, $"TOOL_CALL: {msg.ToolName} call_id={msg.CallId} args={msg.ArgumentsJson ?? "{}"}"); + break; + + case ToolResultOutput msg: + Console.WriteLine($"[tool:result] {msg.ToolName} \u2192 {msg.Result}"); + Log(log, $"TOOL_RESULT: {msg.ToolName} call_id={msg.CallId} result={msg.Result}"); + break; + + case UsageOutput msg: + Console.WriteLine($"[usage] in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens}"); + Log(log, $"USAGE: in={msg.InputTokens} out={msg.OutputTokens} total={msg.TotalTokens} cached={msg.CachedInputTokens} reasoning={msg.ReasoningTokens} context_window={msg.ContextWindowTokens}"); + break; + + case ErrorOutput msg: + Console.Error.WriteLine($"[error] {msg.Message}"); + Log(log, $"ERROR: {msg.Message}"); + if (msg.Cause is not null) + Log(log, $"EXCEPTION: {msg.Cause}"); + break; + + case TurnCompleted msg: + Log(log, $"TURN_COMPLETED: turn={msg.TurnNumber}"); + Log(log, "SESSION_ENDED"); + _lifetime.StopApplication(); + break; + + case CompactionOutput msg: + Console.WriteLine($"[compaction] {msg.MessagesBefore} \u2192 {msg.MessagesAfter} messages"); + Log(log, $"COMPACTION: before={msg.MessagesBefore} after={msg.MessagesAfter} tool_results_cleared={msg.ToolResultsCleared} summarized={msg.Summarized}"); + break; + } + } + + private static void Log(StreamWriter log, string message) + { + log.WriteLine($"[{DateTimeOffset.UtcNow:o}] {message}"); + } +} diff --git a/src/Netclaw.App/HeadlessOptions.cs b/src/Netclaw.App/HeadlessOptions.cs deleted file mode 100644 index 3c76b5f93..000000000 --- a/src/Netclaw.App/HeadlessOptions.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Netclaw.App; - -public sealed class HeadlessOptions -{ - public required string Prompt { get; init; } -} diff --git a/src/Netclaw.App/Netclaw.App.csproj b/src/Netclaw.App/Netclaw.App.csproj index 64e0f519b..d07461724 100644 --- a/src/Netclaw.App/Netclaw.App.csproj +++ b/src/Netclaw.App/Netclaw.App.csproj @@ -17,6 +17,7 @@ + diff --git a/src/Netclaw.App/Program.cs b/src/Netclaw.App/Program.cs index 8cc06e6bf..e16e9a665 100644 --- a/src/Netclaw.App/Program.cs +++ b/src/Netclaw.App/Program.cs @@ -5,7 +5,9 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Netclaw.Actors.Channels; using Netclaw.Actors.Configuration; +using Netclaw.Channels; using Netclaw.Actors.Hosting; using Netclaw.Actors.Sessions; using Netclaw.Actors.Tools; @@ -39,6 +41,9 @@ builder.Logging.ClearProviders(); builder.Logging.SetMinimumLevel(LogLevel.Warning); +// -- TimeProvider -- +builder.Services.AddSingleton(TimeProvider.System); + // -- Ollama IChatClient -- var ollamaUrl = builder.Configuration["Ollama:Url"] ?? "http://localhost:11434"; var ollamaModel = builder.Configuration["Ollama:Model"] ?? "qwen3:30b"; @@ -82,15 +87,22 @@ .WithNetclawActors(); }); -// -- Adapter selection -- +// -- Session pipeline (stream API for channels) -- +builder.Services.AddSingleton(); + +// -- Channel selection -- if (headlessPrompt is not null) { - builder.Services.AddSingleton(new HeadlessOptions { Prompt = headlessPrompt }); - builder.Services.AddHostedService(); + builder.Services.AddSingleton(sp => + ActivatorUtilities.CreateInstance(sp, headlessPrompt)); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); } else { - builder.Services.AddHostedService(); + builder.Services.AddSingleton(); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); + builder.Services.AddSingleton(sp => sp.GetRequiredService()); } await builder.Build().RunAsync(); diff --git a/src/Netclaw.Channels/IChannel.cs b/src/Netclaw.Channels/IChannel.cs new file mode 100644 index 000000000..d8f036258 --- /dev/null +++ b/src/Netclaw.Channels/IChannel.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Hosting; + +namespace Netclaw.Channels; + +/// +/// Marker interface for input/output channels. Each channel is a hosted service +/// that manages one or more sessions through Akka.Streams pipelines. +/// +public interface IChannel : IHostedService +{ + string ChannelType { get; } + + string DisplayName { get; } + + ChannelHealth GetHealth(); +} + +public enum ChannelHealthStatus +{ + Healthy, + Degraded, + Disconnected +} + +public sealed record ChannelHealth(ChannelHealthStatus Status, string? Detail = null); diff --git a/src/Netclaw.Channels/Netclaw.Channels.csproj b/src/Netclaw.Channels/Netclaw.Channels.csproj new file mode 100644 index 000000000..31a8e0ac7 --- /dev/null +++ b/src/Netclaw.Channels/Netclaw.Channels.csproj @@ -0,0 +1,17 @@ + + + + net10.0 + enable + enable + + + + + + + + + + +