Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { readFile } from 'node:fs/promises';
import type { ToolContext, ToolExecutionResult } from '../../../../tools/types.js';
import { getAnthropicProvider, extractText, userMessageWithImage } from '../../../../providers/anthropic-send-message.js';
import { getAnthropicProvider, extractText, userMessageWithImage, userMessageWithImages } from '../../../../providers/anthropic-send-message.js';
import { initializeProviders } from '../../../../providers/registry.js';
import { loadConfig, invalidateConfigCache } from '../../../../config/loader.js';
import {
Expand All @@ -26,12 +26,103 @@ const VLM_PROMPT = `Analyze this image frame extracted from a video. Return a JS

Return ONLY the JSON object, no additional text.`;

const CHUNKED_VLM_PROMPT = `You are analyzing a sequence of {N} consecutive video frames extracted at regular intervals from a video. The frames are in chronological order.

For EACH frame, provide a JSON object with:
- "frameIndex": the 0-based index within this chunk
- "sceneDescription": concise description of the scene in this frame
- "subjects": array of identifiable subjects/objects/people
- "actions": array of actions or activities occurring
- "context": environmental or situational context
- "transitions": any notable changes from the previous frame (empty string for the first frame)

Return a JSON array of these objects, one per frame. Return ONLY the JSON array, no additional text.`;

function buildChunks(keyframes: MediaKeyframe[], chunkSize: number, overlap: number): MediaKeyframe[][] {
const chunks: MediaKeyframe[][] = [];
const step = chunkSize - overlap;
for (let i = 0; i < keyframes.length; i += step) {
Comment thread
alex-nork marked this conversation as resolved.
chunks.push(keyframes.slice(i, i + chunkSize));
}
return chunks;
}
Comment thread
alex-nork marked this conversation as resolved.

async function analyzeChunk(
provider: import('../../../../providers/types.js').Provider,
chunk: MediaKeyframe[],
): Promise<Array<{ keyframeId: string; output: Record<string, unknown>; confidence: number }>> {
const images: Array<{ base64: string; mediaType: string }> = [];

for (const keyframe of chunk) {
const imageData = await readFile(keyframe.filePath);
const base64 = imageData.toString('base64');
const ext = keyframe.filePath.split('.').pop()?.toLowerCase() ?? 'jpg';
const mediaTypeMap: Record<string, string> = {
jpg: 'image/jpeg',
jpeg: 'image/jpeg',
png: 'image/png',
gif: 'image/gif',
webp: 'image/webp',
};
const mediaType = mediaTypeMap[ext] ?? 'image/jpeg';
images.push({ base64, mediaType });
}

const prompt = CHUNKED_VLM_PROMPT.replace('{N}', String(chunk.length));

const response = await provider.sendMessage(
[userMessageWithImages(images, prompt)],
undefined,
undefined,
{
config: {
model: 'claude-sonnet-4-6',
max_tokens: 4096,
},
},
);

const responseText = extractText(response);

let parsed: Array<Record<string, unknown>>;
try {
const jsonMatch = responseText.match(/```(?:json)?\s*([\s\S]*?)```/) ?? [null, responseText];
parsed = JSON.parse(jsonMatch[1]!.trim()) as Array<Record<string, unknown>>;
} catch {
// If parsing fails, return a single entry wrapping the raw text
parsed = chunk.map((_, idx) => ({
frameIndex: idx,
sceneDescription: idx === 0 ? responseText : '',
subjects: [],
actions: [],
context: '',
transitions: '',
}));
}

return chunk.map((keyframe, idx) => {
const entry = parsed[idx] ?? {
frameIndex: idx,
sceneDescription: '',
subjects: [],
actions: [],
context: '',
transitions: '',
};
// Add timestamp context
entry.timestamp = keyframe.timestamp;
return { keyframeId: keyframe.id, output: entry, confidence: 0.8 };
});
}

export async function analyzeKeyframesForAsset(
assetId: string,
analysisType?: string,
batchSize?: number,
onProgress?: (msg: string) => void,
signal?: AbortSignal,
chunkSize?: number,
overlap?: number,
): Promise<void> {
const type = analysisType ?? 'scene_description';
const batch = batchSize ?? 10;
Expand Down Expand Up @@ -91,63 +182,66 @@ export async function analyzeKeyframesForAsset(
throw new Error('No Anthropic API key available. Add one in Settings → Integrations.');
}

const effectiveChunkSize = chunkSize ?? 10;
const effectiveOverlap = overlap ?? 2;

let analyzedCount = analyzedKeyframeIds.size;
const totalKeyframes = keyframes.length;

onProgress?.(`Analyzing ${pendingKeyframes.length} keyframes (${analyzedKeyframeIds.size} already done)...\n`);
const chunks = buildChunks(pendingKeyframes, effectiveChunkSize, effectiveOverlap);

onProgress?.(`Analyzing ${pendingKeyframes.length} keyframes in ${chunks.length} chunks (${analyzedKeyframeIds.size} already done)...\n`);

let aborted = false;

try {
// Process in batches
for (let i = 0; i < pendingKeyframes.length; i += batch) {
for (let chunkIdx = 0; chunkIdx < chunks.length; chunkIdx++) {
if (signal?.aborted) {
onProgress?.('Aborted.\n');
aborted = true;
break;
}

const currentBatch = pendingKeyframes.slice(i, i + batch);
const batchResults: Array<{
assetId: string;
keyframeId: string;
analysisType: string;
output: Record<string, unknown>;
confidence?: number;
}> = [];

for (const keyframe of currentBatch) {
if (signal?.aborted) {
onProgress?.('Aborted.\n');
aborted = true;
break;
}

try {
const result = await analyzeKeyframe(provider, keyframe);
const chunk = chunks[chunkIdx]!;

try {
const chunkResults = await analyzeChunk(provider, chunk);

const batchResults: Array<{
assetId: string;
keyframeId: string;
analysisType: string;
output: Record<string, unknown>;
confidence?: number;
}> = [];

for (const result of chunkResults) {
// Overlap dedup: skip keyframes already inserted from a previous chunk
if (analyzedKeyframeIds.has(result.keyframeId)) {
continue;
}
analyzedKeyframeIds.add(result.keyframeId);
analyzedCount++;
batchResults.push({
assetId,
keyframeId: keyframe.id,
keyframeId: result.keyframeId,
analysisType: type,
output: result.output,
confidence: result.confidence,
});
analyzedCount++;
} catch (err) {
onProgress?.(` Warning: failed to analyze frame at ${keyframe.timestamp}s: ${(err as Error).message}\n`);
}
}

// Batch insert results
if (batchResults.length > 0) {
insertVisionOutputsBatch(batchResults);
if (batchResults.length > 0) {
insertVisionOutputsBatch(batchResults);
}
} catch (err) {
onProgress?.(` Warning: failed to analyze chunk ${chunkIdx + 1}: ${(err as Error).message}\n`);
}

// Update progress
const progress = Math.round((analyzedCount / totalKeyframes) * 100);
updateProcessingStage(stage.id, { progress });

onProgress?.(` Batch ${Math.floor(i / batch) + 1}: analyzed ${batchResults.length}/${currentBatch.length} frames (${progress}% total)\n`);
onProgress?.(` Chunk ${chunkIdx + 1}/${chunks.length}: ${chunk.length} frames (${progress}% total)\n`);
}

if (aborted) {
Expand Down Expand Up @@ -182,6 +276,8 @@ export async function run(

const analysisType = (input.analysis_type as string) || 'scene_description';
const batchSize = (input.batch_size as number) || 10;
const chunkSizeInput = (input.chunk_size as number) || 10;
const overlapInput = (input.overlap as number) || 2;
Comment thread
alex-nork marked this conversation as resolved.

try {
// Check if all keyframes are already analyzed before calling the core function
Expand All @@ -203,7 +299,7 @@ export async function run(
};
}

await analyzeKeyframesForAsset(assetId, analysisType, batchSize, context.onOutput, context.signal);
await analyzeKeyframesForAsset(assetId, analysisType, batchSize, context.onOutput, context.signal, chunkSizeInput, overlapInput);

// Gather final stats
const allKeyframes = getKeyframesForAsset(assetId);
Expand Down
Loading