Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 60 additions & 13 deletions apps/desktop/src/main/todo-agent/supervisor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import path from "node:path";
import type { SelectTodoSession } from "@superset/local-db";
import { getCurrentHeadSha } from "./git-status";
import { getTodoSessionStore, resolveWorktreePath } from "./session-store";
import { getTodoSettings } from "./settings";
import type { TodoStreamEventKind } from "./types";
import { TODO_ARTIFACT_SUBDIR } from "./types";

Expand Down Expand Up @@ -46,7 +47,13 @@ interface ActiveRun {
}

class TodoSupervisor {
private active: ActiveRun | undefined;
/**
* Currently executing sessions keyed by sessionId. The size of this map
* is compared against `maxConcurrentTasks` in `drain()` to decide whether
* the next pending session can start. Keyed storage (as opposed to a
* single slot) lets `abort()` target a specific run without scanning.
*/
private readonly active = new Map<string, ActiveRun>();
private readonly queue: string[] = [];

/**
Expand Down Expand Up @@ -81,12 +88,38 @@ class TodoSupervisor {
}

async start(sessionId: string): Promise<void> {
if (this.active) {
if (!this.queue.includes(sessionId)) this.queue.push(sessionId);
return;
}
await this.runSession(sessionId);
while (this.queue.length > 0) {
// Already executing or queued — no-op so repeated `start` calls
// from the UI do not create duplicate work.
if (this.active.has(sessionId)) return;
if (this.queue.includes(sessionId)) return;
this.queue.push(sessionId);
this.drain();
}

/**
* Called by the settings mutation after `maxConcurrentTasks` changes.
* When the user raises the concurrency cap we need to pull the next
* pending sessions from the queue immediately — otherwise they sit
* idle until the currently active session completes, which is the
* exact symptom reported in issue #220. Lowering the cap is handled
* passively (new starts are blocked until capacity frees up; already
* running sessions keep running).
*/
handleSettingsChanged(): void {
this.drain();
}

/**
* Launch as many queued sessions as `maxConcurrentTasks` permits.
* Synchronous: each launch kicks off `runSession` as a fire-and-
* forget Promise whose `finally` loops back into `drain()` so the
* next slot fills as soon as a session finishes. The settings value
* is re-read on every call so live setting updates take effect
* without restart.
*/
private drain(): void {
const capacity = getTodoSettings().maxConcurrentTasks;
while (this.active.size < capacity && this.queue.length > 0) {
const next = this.queue.shift();
if (!next) continue;
// A session can be aborted / deleted / rerun while still
Expand All @@ -103,7 +136,20 @@ class TodoSupervisor {
) {
continue;
}
await this.runSession(next);
// `runSession` sets `this.active[sessionId]` synchronously
// before its first `await`, so by the time control returns
// here the slot count reflects the new run and the while
// loop's capacity check stays accurate.
void this.runSession(next)
.catch((err) => {
console.warn(
`[todo-supervisor] runSession crashed for ${next}:`,
err,
);
})
.finally(() => {
this.drain();
});
}
}

Expand All @@ -116,8 +162,9 @@ class TodoSupervisor {
if (queueIdx !== -1) {
this.queue.splice(queueIdx, 1);
}
if (this.active?.sessionId === sessionId) {
this.active.abortController.abort();
const activeRun = this.active.get(sessionId);
if (activeRun) {
activeRun.abortController.abort();
// Kill the whole process group, not just the direct child.
// `claude -p` spawns its own children (the Node-side agent
// loop, MCP servers, tool helpers). A plain `child.kill()`
Expand All @@ -127,7 +174,7 @@ class TodoSupervisor {
// actually stop. We `spawn` with `detached: true` so the
// child becomes a session leader; here we signal the
// negative PID to reach every descendant.
const child = this.active.currentChild;
const child = activeRun.currentChild;
if (child?.pid) {
const pid = child.pid;
killProcessTree(pid, "SIGINT");
Expand Down Expand Up @@ -211,7 +258,7 @@ class TodoSupervisor {
startedAt: Date.now(),
currentChild: null,
};
this.active = run;
this.active.set(sessionId, run);

try {
appendSetupEvent(
Expand Down Expand Up @@ -550,7 +597,7 @@ class TodoSupervisor {
});
}
} finally {
this.active = undefined;
this.active.delete(sessionId);
}
}

Expand Down
10 changes: 9 additions & 1 deletion apps/desktop/src/main/todo-agent/trpc-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,15 @@ export const createTodoAgentRouter = () => {
get: publicProcedure.query(() => getTodoSettings()),
update: publicProcedure
.input(todoSettingsUpdateSchema)
.mutation(({ input }) => updateTodoSettings(input)),
.mutation(({ input }) => {
const next = updateTodoSettings(input);
// Nudge the supervisor so a raised `maxConcurrentTasks`
// immediately releases queued sessions. Without this, a
// bump from 1 → N leaves already-pending tasks waiting
// until the currently running session finishes.
getTodoSupervisor().handleSettingsChanged();
return next;
}),
}),

schedule: router({
Expand Down
Loading