Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions assistant/src/cli/commands/__tests__/email-list.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { runAssistantCommand } from "../../__tests__/run-assistant-command.js";

const ASSISTANT_ID = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
const API_KEY_CREDENTIAL = credentialKey("vellum", "assistant_api_key");
const ASSISTANT_ID_CREDENTIAL = credentialKey("vellum", "platform_assistant_id");

/**
* Return the recorded fetch calls, excluding the feature-flag fetch that
Expand Down Expand Up @@ -83,6 +84,11 @@ beforeEach(async () => {
_setOverridesForTesting({ "email-channel": true });
setPlatformAssistantId(ASSISTANT_ID);
await setSecureKeyAsync(API_KEY_CREDENTIAL, "test-api-key");
// Ensure VellumPlatformClient.create() cannot fall back to a real
// platform_assistant_id from the encrypted credential store on dev
// machines — the "missing assistant ID" test relies on the fallback
// lookup returning empty.
await deleteSecureKeyAsync(ASSISTANT_ID_CREDENTIAL);
});

afterEach(() => {
Expand Down
30 changes: 9 additions & 21 deletions assistant/src/config/schemas/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,16 @@ export const HeartbeatConfigSchema = z
const startNull = config.activeHoursStart == null;
const endNull = config.activeHoursEnd == null;
if (startNull !== endNull) {
// Emit on both fields so validateWithSchema's delete-and-retry strips
// both sides in one pass. Single-emit on the null side can cascade when
// the explicit value happens to equal the opposite default (e.g.
// { start: null, end: 8 } → strip start → default 8 → equal check fires
// → loader falls back to full defaults, wiping unrelated keys like
// maxTokens).
// Emit only on the null side so validateWithSchema's delete-and-retry
// preserves the explicit non-null value. Dual-emit would delete both
// keys, losing valid explicit values for mixed-null configs like
// { activeHoursStart: null, activeHoursEnd: 20 } → (8, 22) instead of
// retaining the explicit 20.
const message =
"heartbeat.activeHoursStart and heartbeat.activeHoursEnd must both be set or both be null";
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["activeHoursStart"],
message,
});
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["activeHoursEnd"],
path: [startNull ? "activeHoursStart" : "activeHoursEnd"],
Comment thread
siddseethepalli marked this conversation as resolved.
message,
});
Comment thread
siddseethepalli marked this conversation as resolved.
return;
Expand All @@ -68,17 +62,11 @@ export const HeartbeatConfigSchema = z
config.activeHoursEnd != null &&
config.activeHoursStart === config.activeHoursEnd
) {
// Emit on both fields. Single-emit would strip one side and the default
// for that side could recreate a new mismatch (e.g. { start: 22, end: 22 }
// → strip end → default 22 → equal again), cascading to a full defaults
// reset that wipes unrelated fields.
// Emit only on activeHoursEnd so the explicit start value is preserved.
// Dual-emit would delete both keys, e.g. { start: 5, end: 5 } → (8, 22)
// instead of preserving the explicit 5 as start → (5, 22).
const message =
"heartbeat.activeHoursStart and heartbeat.activeHoursEnd must not be equal (would create an empty window)";
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["activeHoursStart"],
message,
});
ctx.addIssue({
code: z.ZodIssueCode.custom,
path: ["activeHoursEnd"],
Expand Down
27 changes: 19 additions & 8 deletions assistant/src/credential-execution/executable-discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,28 @@ function getManagedBootstrapSocketPath(): string {
* a malicious binary there. Removed to close the sandbox-escape vector.
*
* Search order:
* 1. Alongside the running executable (packaged macOS app:
* `<App>.app/Contents/MacOS/credential-executor`). When running from
* source via `bun run`, `process.execPath` points at the bun binary
* itself, so this path won't exist and the search falls through.
* 1. Alongside the running executable, but ONLY when running from a
* packaged macOS app bundle (`<App>.app/Contents/MacOS/credential-executor`).
* In dev mode, `process.execPath` points at the bun/node install dir
* (e.g. `~/.bun/bin`), where an unrelated file named `credential-executor`
* could be picked up by accident.
* 2. `<binDir>/credential-executor` — user-installed override (dev flow).
*/
function getLocalBinarySearchPaths(): string[] {
return [
join(dirname(process.execPath), "credential-executor"),
join(getBinDir(), "credential-executor"),
];
const paths: string[] = [];

// Only check the sibling of process.execPath when running from a packaged
// app bundle — the .app/Contents/MacOS directory is a controlled location.
// In dev mode, process.execPath is the bun/node binary (e.g. ~/.bun/bin/bun)
// and a sibling lookup there could discover an unrelated or untrusted
// executable.
const execDir = dirname(process.execPath);
if (execDir.includes(".app/Contents/MacOS")) {
paths.push(join(execDir, "credential-executor"));
}

paths.push(join(getBinDir(), "credential-executor"));
return paths;
}

// ---------------------------------------------------------------------------
Expand Down
27 changes: 14 additions & 13 deletions assistant/src/memory/conversation-analyze-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
//
// Bridges the jobs worker to the shared analyzeConversation() service. The
// deps bundle is stashed on a module singleton during daemon startup; if it
// isn't set yet we skip this iteration. The next batch / idle / lifecycle
// trigger from `enqueueAutoAnalysisIfEnabled()` will produce a fresh job
// once the daemon has fully started.
// isn't set yet the handler throws BackendUnavailableError so the worker
// defers with exponential backoff until deps become available.
//
// The service itself distinguishes manual vs. auto triggers: this handler
// always invokes with `trigger: "auto"`, so the rolling analysis conversation
Expand All @@ -15,6 +14,7 @@
import type { AssistantConfig } from "../config/types.js";
import { analyzeConversation } from "../runtime/services/analyze-conversation.js";
import { getAnalysisDeps } from "../runtime/services/analyze-deps-singleton.js";
import { BackendUnavailableError } from "../util/errors.js";
import { getLogger } from "../util/logger.js";
import { enqueueAutoAnalysisIfEnabled } from "./auto-analysis-enqueue.js";
import type { MemoryJob } from "./jobs-store.js";
Expand All @@ -33,19 +33,20 @@ export async function conversationAnalyzeJob(

const deps = getAnalysisDeps();
if (!deps) {
// Daemon hasn't finished startup. Return without throwing — a plain
// Error here would be classified as fatal by `classifyError()` and the
// worker would mark the job permanently failed. Throwing
// `BackendUnavailableError` would defer, but defer counters cap out and
// would still permanently fail in the worst case. Since
// `enqueueAutoAnalysisIfEnabled()` re-enqueues on the next batch / idle
// / lifecycle trigger, dropping this iteration is the safest choice and
// avoids retry storms during slow daemon startup.
// Daemon hasn't finished startup. Throw BackendUnavailableError so the
// worker defers the job with exponential backoff instead of completing
// it. Returning success here would permanently drop the job via
// completeMemoryJob — conversations with a pre-existing queued job
// during startup and no subsequent activity would never be analyzed.
// The deferral budget (50 × up to 5min backoff) is generous enough to
// outlast any realistic startup delay.
log.warn(
{ jobId: job.id, conversationId },
"Skipping job: analysis deps not yet initialized; will retrigger",
"Deferring job: analysis deps not yet initialized",
);
throw new BackendUnavailableError(
"Analysis deps not yet initialized during daemon startup",
);
return;
}

const result = await analyzeConversation(conversationId, deps, {
Expand Down
10 changes: 5 additions & 5 deletions assistant/src/memory/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,11 @@ export async function indexMessageNow(
);

// ── Auto-analysis triggers ─────────────────────────────────────
// Both triggers route through `upsertDebouncedJob` in the helper,
// so a single pending row is shared. Order matters: the idle
// upsert runs first (pushing `runAfter` into the future); the
// batch trigger runs last so a threshold crossing pulls
// `runAfter` back to "now" and overrides the idle debounce.
// Immediate triggers (batch, compaction) and debounced triggers
// (idle, lifecycle) write to separate rows keyed by triggerGroup
// via `upsertAutoAnalysisJob`. When an immediate trigger fires,
// it cancels any pending debounced row for the same conversation
// to avoid redundant analysis runs.
enqueueAutoAnalysisIfEnabled({
conversationId: input.conversationId,
trigger: "idle",
Expand Down
45 changes: 42 additions & 3 deletions assistant/src/memory/jobs-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { and, asc, eq, inArray, lte, notInArray, sql } from "drizzle-orm";
import { and, asc, eq, inArray, lte, notInArray, or, sql } from "drizzle-orm";
import { v4 as uuid } from "uuid";

import { getLogger } from "../util/logger.js";
Expand Down Expand Up @@ -165,6 +165,10 @@ export function upsertAutoAnalysisJob(
: never,
): void {
const db = dbOverride ?? getDb();
// Match rows with the same triggerGroup OR legacy rows without triggerGroup
// (from older builds that used upsertDebouncedJob before triggerGroup was
// introduced). Without the IS NULL fallback, the next enqueue would insert
// a duplicate pending row for the same conversation.
const existing = db
.select()
.from(memoryJobs)
Expand All @@ -173,18 +177,53 @@ export function upsertAutoAnalysisJob(
eq(memoryJobs.type, "conversation_analyze"),
eq(memoryJobs.status, "pending"),
sql`json_extract(${memoryJobs.payload}, '$.conversationId') = ${payload.conversationId}`,
sql`json_extract(${memoryJobs.payload}, '$.triggerGroup') = ${payload.triggerGroup}`,
or(
sql`json_extract(${memoryJobs.payload}, '$.triggerGroup') = ${payload.triggerGroup}`,
sql`json_extract(${memoryJobs.payload}, '$.triggerGroup') IS NULL`,
),
),
)
.get();
if (existing) {
// Merge triggerGroup into legacy rows so subsequent lookups use the new key.
const existingPayload = JSON.parse(existing.payload) as Record<
string,
unknown
>;
const needsPayloadUpdate = !existingPayload.triggerGroup;
db.update(memoryJobs)
.set({ runAfter, updatedAt: Date.now() })
.set({
runAfter,
updatedAt: Date.now(),
...(needsPayloadUpdate
? {
payload: JSON.stringify({ ...existingPayload, ...payload }),
}
: {}),
})
.where(eq(memoryJobs.id, existing.id))
.run();
} else {
enqueueMemoryJob("conversation_analyze", payload, runAfter, dbOverride);
}

// When an immediate trigger fires (batch/compaction), cancel any pending
// debounced row for the same conversation — the immediate analysis covers
// those messages, making the debounced pass redundant. Without this, both
// rows fire independently and double the LLM cost per batch crossing.
if (payload.triggerGroup === "immediate") {
db.update(memoryJobs)
.set({ status: "completed", updatedAt: Date.now() })
.where(
and(
eq(memoryJobs.type, "conversation_analyze"),
eq(memoryJobs.status, "pending"),
sql`json_extract(${memoryJobs.payload}, '$.conversationId') = ${payload.conversationId}`,
sql`json_extract(${memoryJobs.payload}, '$.triggerGroup') = 'debounced'`,
),
)
.run();
}
}

/**
Expand Down
67 changes: 38 additions & 29 deletions skills/meet-join/daemon/session-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ class MeetSessionManagerImpl {
}
}
this.sessions.clear();
this.pendingBotTokens.clear();
}

/**
Expand Down Expand Up @@ -704,34 +705,43 @@ class MeetSessionManagerImpl {
// authoritative `this.sessions` lookup once the session is in the map.
this.pendingBotTokens.set(meetingId, botApiToken);

// Placeholder — Phase 3 (PR 23+) will resolve the real TTS credential.
const ttsKey = (await this.deps.getProviderKey("tts")) ?? "";

const daemonUrl = this.deps.resolveDaemonUrl();

// Resolve the effective bot display name. Priority:
// 1. `services.meet.joinName` when set.
// 2. The assistant display name from IDENTITY.md.
// 3. {@link MEET_JOIN_NAME_FALLBACK} — guarantees a non-empty string
// so the bot's `needsFullWiring` predicate never silently downgrades
// the container to screenshot-only mode.
// The same value is used for `JOIN_NAME` AND for `{assistantName}`
// substitution in the consent message — the bot needs both.
const effectiveJoinName =
meet.joinName ??
this.deps.resolveAssistantDisplayName() ??
MEET_JOIN_NAME_FALLBACK;

// `{assistantName}` substitution is owned by the `meet_join` tool
// (PR 23), which resolves the assistant name from IDENTITY.md and
// passes a substituted string via `input.consentMessage`. Callers that
// bypass the tool (direct API users, tests) pass the raw template —
// substitute here so the bot receives a human-readable greeting
// regardless of entry point.
const resolvedConsentMessage = substituteAssistantName(
consentMessage ?? meet.consentMessage,
effectiveJoinName,
);
let ttsKey: string;
let daemonUrl: string;
let effectiveJoinName: string;
let resolvedConsentMessage: string;
try {
// Placeholder — Phase 3 (PR 23+) will resolve the real TTS credential.
ttsKey = (await this.deps.getProviderKey("tts")) ?? "";

daemonUrl = this.deps.resolveDaemonUrl();

// Resolve the effective bot display name. Priority:
// 1. `services.meet.joinName` when set.
// 2. The assistant display name from IDENTITY.md.
// 3. {@link MEET_JOIN_NAME_FALLBACK} — guarantees a non-empty string
// so the bot's `needsFullWiring` predicate never silently downgrades
// the container to screenshot-only mode.
// The same value is used for `JOIN_NAME` AND for `{assistantName}`
// substitution in the consent message — the bot needs both.
effectiveJoinName =
meet.joinName ??
this.deps.resolveAssistantDisplayName() ??
MEET_JOIN_NAME_FALLBACK;

// `{assistantName}` substitution is owned by the `meet_join` tool
// (PR 23), which resolves the assistant name from IDENTITY.md and
// passes a substituted string via `input.consentMessage`. Callers that
// bypass the tool (direct API users, tests) pass the raw template —
// substitute here so the bot receives a human-readable greeting
// regardless of entry point.
resolvedConsentMessage = substituteAssistantName(
consentMessage ?? meet.consentMessage,
effectiveJoinName,
);
} catch (err) {
this.pendingBotTokens.delete(meetingId);
throw err;
}
Comment thread
siddseethepalli marked this conversation as resolved.

// Register the dispatcher BEFORE the audio-ingest starts so transcripts
// fired by Deepgram the instant the streaming session opens cannot race
Expand Down Expand Up @@ -1810,7 +1820,6 @@ export function substituteAssistantName(
return template.split("{assistantName}").join(assistantName);
}

/** Strip internal fields (`timeoutHandle`) from a session before exposing it. */
/**
* Best-effort: pull the bot container's accumulated stdout/stderr and
* persist it to `<meetingDir>/bot.log` before the container is removed.
Expand Down
Loading