Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 46 additions & 18 deletions assistant/src/memory/jobs-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ import {
memoryV2ConsolidateJob,
} from "./v2/consolidation-job.js";
import { memoryV2SweepJob } from "./v2/sweep-job.js";
import { memoryV3ConsolidateJob } from "./v3/consolidation-job.js";
import { memoryV3IndexMaintenanceJob } from "./v3/maintenance.js";

const log = getLogger("memory-jobs-worker");

Expand Down Expand Up @@ -603,6 +605,12 @@ async function processJob(
case "memory_v2_consolidate":
await memoryV2ConsolidateJob(job, config);
return;
case "memory_v3_consolidate":
await memoryV3ConsolidateJob(job, config);
return;
case "memory_v3_index_maintenance":
await memoryV3IndexMaintenanceJob(job);
return;
case "memory_v2_migrate":
await memoryV2MigrateJob(job, config);
return;
Expand Down Expand Up @@ -681,17 +689,28 @@ export const GRAPH_MAINTENANCE_CHECKPOINTS = {
patternScan: "graph_maintenance:pattern_scan:last_run",
narrative: "graph_maintenance:narrative:last_run",
memoryV2Consolidate: "memory_v2_consolidate_last_run",
memoryV3Consolidate: "memory_v3_consolidate_last_run",
} as const;

/**
* Enqueue periodic graph maintenance jobs.
*
* Mutually exclusive between v1 and v2:
* - v2 active (`memory.v2.enabled` on) → only `memory_v2_consolidate` is
* scheduled.
* - v2 active (`memory.v2.enabled` on) → only one buffer-drainer is
* scheduled (see below).
* - v2 inactive → the four v1 entries (decay, consolidate, pattern_scan,
* narrative) are scheduled instead.
*
* **Buffer-drainer retarget (v2 vs v3).** The `memory/buffer.md` is shared, so
* exactly one consolidator may own the drain at a time. When
* `memory.v3.write.enabled` is on, the v3 consolidator (`memory_v3_consolidate`)
* is scheduled INSTEAD of `memory_v2_consolidate` — same shared buffer +
* standing-context files, additionally authored into the v3 tree. When the v3
* write flag is off (default) the v2 consolidator stays the sole drainer,
* unchanged. The retarget is a clean conditional, fully reversible via the flag.
* Concept pages stay the shared canonical store, so the v2 router keeps working
* off pages v3 writes regardless of which consolidator ran.
*
* Read/write paths route to v2 when the flag is on, so v1 graph data goes
* unread; running v1 maintenance alongside v2 is wasted compute and LLM
* spend. The v1 code path remains live so flipping the flag back to off
Expand All @@ -708,20 +727,29 @@ export function maybeEnqueueGraphMaintenanceJobs(
nowMs = Date.now(),
): void {
const v2Active = config.memory.v2.enabled;
const v3WriteActive = config.memory.v3.write.enabled;

// The single buffer-drainer entry for the v2-active branch: v3 when the v3
// write flag owns the drain, v2 otherwise. Same shared buffer either way.
const consolidateEntry = v3WriteActive
? {
key: GRAPH_MAINTENANCE_CHECKPOINTS.memoryV3Consolidate,
intervalMs: config.memory.v3.write.consolidateIntervalMs,
jobType: "memory_v3_consolidate" as MemoryJobType,
}
: {
key: GRAPH_MAINTENANCE_CHECKPOINTS.memoryV2Consolidate,
intervalMs:
config.memory.v2.consolidation_interval_hours * 60 * 60 * 1000,
jobType: "memory_v2_consolidate" as MemoryJobType,
};

const schedule: Array<{
key: string;
intervalMs: number;
jobType: MemoryJobType;
}> = v2Active
? [
{
key: GRAPH_MAINTENANCE_CHECKPOINTS.memoryV2Consolidate,
intervalMs:
config.memory.v2.consolidation_interval_hours * 60 * 60 * 1000,
jobType: "memory_v2_consolidate",
},
]
? [consolidateEntry]
: [
{
key: GRAPH_MAINTENANCE_CHECKPOINTS.decay,
Expand All @@ -745,25 +773,25 @@ export function maybeEnqueueGraphMaintenanceJobs(
},
];

let enqueuedV2 = false;
let enqueuedConsolidate = false;
for (const { key, intervalMs, jobType } of schedule) {
const lastRun = parseInt(getMemoryCheckpoint(key) ?? "0", 10);
if (nowMs - lastRun >= intervalMs) {
enqueueMemoryJob(jobType, {});
setMemoryCheckpoint(key, String(nowMs));
if (jobType === "memory_v2_consolidate") enqueuedV2 = true;
if (jobType === consolidateEntry.jobType) enqueuedConsolidate = true;
}
}

// Size-based trigger: when the shared buffer crosses the configured line
// count, drain it now rather than waiting out the interval. Retargets to the
// same consolidator the interval branch above selected.
const maxLines = config.memory.v2.consolidation_max_buffer_lines;
if (v2Active && !enqueuedV2 && maxLines !== null) {
if (v2Active && !enqueuedConsolidate && maxLines !== null) {
const bufferPath = join(getWorkspaceDir(), "memory", "buffer.md");
if (countBufferLines(bufferPath) >= maxLines) {
enqueueMemoryJob("memory_v2_consolidate", {});
setMemoryCheckpoint(
GRAPH_MAINTENANCE_CHECKPOINTS.memoryV2Consolidate,
String(nowMs),
);
enqueueMemoryJob(consolidateEntry.jobType, {});
setMemoryCheckpoint(consolidateEntry.key, String(nowMs));
}
}
}
Loading
Loading