diff --git a/apps/desktop/src/main/todo-agent/supervisor.ts b/apps/desktop/src/main/todo-agent/supervisor.ts index e3111c9c9e4..2129360d2da 100644 --- a/apps/desktop/src/main/todo-agent/supervisor.ts +++ b/apps/desktop/src/main/todo-agent/supervisor.ts @@ -5,6 +5,7 @@ import path from "node:path"; import type { SelectTodoSession } from "@superset/local-db"; import { getCurrentHeadSha } from "./git-status"; import { getTodoSessionStore, resolveWorktreePath } from "./session-store"; +import { getTodoSettings } from "./settings"; import type { TodoStreamEventKind } from "./types"; import { TODO_ARTIFACT_SUBDIR } from "./types"; @@ -46,7 +47,13 @@ interface ActiveRun { } class TodoSupervisor { - private active: ActiveRun | undefined; + /** + * Currently executing sessions keyed by sessionId. The size of this map + * is compared against `maxConcurrentTasks` in `drain()` to decide whether + * the next pending session can start. Keyed storage (as opposed to a + * single slot) lets `abort()` target a specific run without scanning. + */ + private readonly active = new Map(); private readonly queue: string[] = []; /** @@ -81,12 +88,38 @@ class TodoSupervisor { } async start(sessionId: string): Promise { - if (this.active) { - if (!this.queue.includes(sessionId)) this.queue.push(sessionId); - return; - } - await this.runSession(sessionId); - while (this.queue.length > 0) { + // Already executing or queued — no-op so repeated `start` calls + // from the UI do not create duplicate work. + if (this.active.has(sessionId)) return; + if (this.queue.includes(sessionId)) return; + this.queue.push(sessionId); + this.drain(); + } + + /** + * Called by the settings mutation after `maxConcurrentTasks` changes. + * When the user raises the concurrency cap we need to pull the next + * pending sessions from the queue immediately — otherwise they sit + * idle until the currently active session completes, which is the + * exact symptom reported in issue #220. Lowering the cap is handled + * passively (new starts are blocked until capacity frees up; already + * running sessions keep running). + */ + handleSettingsChanged(): void { + this.drain(); + } + + /** + * Launch as many queued sessions as `maxConcurrentTasks` permits. + * Synchronous: each launch kicks off `runSession` as a fire-and- + * forget Promise whose `finally` loops back into `drain()` so the + * next slot fills as soon as a session finishes. The settings value + * is re-read on every call so live setting updates take effect + * without restart. + */ + private drain(): void { + const capacity = getTodoSettings().maxConcurrentTasks; + while (this.active.size < capacity && this.queue.length > 0) { const next = this.queue.shift(); if (!next) continue; // A session can be aborted / deleted / rerun while still @@ -103,7 +136,20 @@ class TodoSupervisor { ) { continue; } - await this.runSession(next); + // `runSession` sets `this.active[sessionId]` synchronously + // before its first `await`, so by the time control returns + // here the slot count reflects the new run and the while + // loop's capacity check stays accurate. + void this.runSession(next) + .catch((err) => { + console.warn( + `[todo-supervisor] runSession crashed for ${next}:`, + err, + ); + }) + .finally(() => { + this.drain(); + }); } } @@ -116,8 +162,9 @@ class TodoSupervisor { if (queueIdx !== -1) { this.queue.splice(queueIdx, 1); } - if (this.active?.sessionId === sessionId) { - this.active.abortController.abort(); + const activeRun = this.active.get(sessionId); + if (activeRun) { + activeRun.abortController.abort(); // Kill the whole process group, not just the direct child. // `claude -p` spawns its own children (the Node-side agent // loop, MCP servers, tool helpers). A plain `child.kill()` @@ -127,7 +174,7 @@ class TodoSupervisor { // actually stop. We `spawn` with `detached: true` so the // child becomes a session leader; here we signal the // negative PID to reach every descendant. - const child = this.active.currentChild; + const child = activeRun.currentChild; if (child?.pid) { const pid = child.pid; killProcessTree(pid, "SIGINT"); @@ -211,7 +258,7 @@ class TodoSupervisor { startedAt: Date.now(), currentChild: null, }; - this.active = run; + this.active.set(sessionId, run); try { appendSetupEvent( @@ -550,7 +597,7 @@ class TodoSupervisor { }); } } finally { - this.active = undefined; + this.active.delete(sessionId); } } diff --git a/apps/desktop/src/main/todo-agent/trpc-router.ts b/apps/desktop/src/main/todo-agent/trpc-router.ts index 05e679a8929..0a83211e003 100644 --- a/apps/desktop/src/main/todo-agent/trpc-router.ts +++ b/apps/desktop/src/main/todo-agent/trpc-router.ts @@ -649,7 +649,15 @@ export const createTodoAgentRouter = () => { get: publicProcedure.query(() => getTodoSettings()), update: publicProcedure .input(todoSettingsUpdateSchema) - .mutation(({ input }) => updateTodoSettings(input)), + .mutation(({ input }) => { + const next = updateTodoSettings(input); + // Nudge the supervisor so a raised `maxConcurrentTasks` + // immediately releases queued sessions. Without this, a + // bump from 1 → N leaves already-pending tasks waiting + // until the currently running session finishes. + getTodoSupervisor().handleSettingsChanged(); + return next; + }), }), schedule: router({