diff --git a/assistant/src/config/bundled-skills/media-processing/tools/analyze-keyframes.ts b/assistant/src/config/bundled-skills/media-processing/tools/analyze-keyframes.ts index 16599e134b6..ee2e0350c1c 100644 --- a/assistant/src/config/bundled-skills/media-processing/tools/analyze-keyframes.ts +++ b/assistant/src/config/bundled-skills/media-processing/tools/analyze-keyframes.ts @@ -1,6 +1,6 @@ import { readFile } from 'node:fs/promises'; import type { ToolContext, ToolExecutionResult } from '../../../../tools/types.js'; -import { getAnthropicProvider, extractText, userMessageWithImage } from '../../../../providers/anthropic-send-message.js'; +import { getAnthropicProvider, extractText, userMessageWithImage, userMessageWithImages } from '../../../../providers/anthropic-send-message.js'; import { initializeProviders } from '../../../../providers/registry.js'; import { loadConfig, invalidateConfigCache } from '../../../../config/loader.js'; import { @@ -26,12 +26,103 @@ const VLM_PROMPT = `Analyze this image frame extracted from a video. Return a JS Return ONLY the JSON object, no additional text.`; +const CHUNKED_VLM_PROMPT = `You are analyzing a sequence of {N} consecutive video frames extracted at regular intervals from a video. The frames are in chronological order. + +For EACH frame, provide a JSON object with: +- "frameIndex": the 0-based index within this chunk +- "sceneDescription": concise description of the scene in this frame +- "subjects": array of identifiable subjects/objects/people +- "actions": array of actions or activities occurring +- "context": environmental or situational context +- "transitions": any notable changes from the previous frame (empty string for the first frame) + +Return a JSON array of these objects, one per frame. Return ONLY the JSON array, no additional text.`; + +function buildChunks(keyframes: MediaKeyframe[], chunkSize: number, overlap: number): MediaKeyframe[][] { + const chunks: MediaKeyframe[][] = []; + const step = chunkSize - overlap; + for (let i = 0; i < keyframes.length; i += step) { + chunks.push(keyframes.slice(i, i + chunkSize)); + } + return chunks; +} + +async function analyzeChunk( + provider: import('../../../../providers/types.js').Provider, + chunk: MediaKeyframe[], +): Promise; confidence: number }>> { + const images: Array<{ base64: string; mediaType: string }> = []; + + for (const keyframe of chunk) { + const imageData = await readFile(keyframe.filePath); + const base64 = imageData.toString('base64'); + const ext = keyframe.filePath.split('.').pop()?.toLowerCase() ?? 'jpg'; + const mediaTypeMap: Record = { + jpg: 'image/jpeg', + jpeg: 'image/jpeg', + png: 'image/png', + gif: 'image/gif', + webp: 'image/webp', + }; + const mediaType = mediaTypeMap[ext] ?? 'image/jpeg'; + images.push({ base64, mediaType }); + } + + const prompt = CHUNKED_VLM_PROMPT.replace('{N}', String(chunk.length)); + + const response = await provider.sendMessage( + [userMessageWithImages(images, prompt)], + undefined, + undefined, + { + config: { + model: 'claude-sonnet-4-6', + max_tokens: 4096, + }, + }, + ); + + const responseText = extractText(response); + + let parsed: Array>; + try { + const jsonMatch = responseText.match(/```(?:json)?\s*([\s\S]*?)```/) ?? [null, responseText]; + parsed = JSON.parse(jsonMatch[1]!.trim()) as Array>; + } catch { + // If parsing fails, return a single entry wrapping the raw text + parsed = chunk.map((_, idx) => ({ + frameIndex: idx, + sceneDescription: idx === 0 ? responseText : '', + subjects: [], + actions: [], + context: '', + transitions: '', + })); + } + + return chunk.map((keyframe, idx) => { + const entry = parsed[idx] ?? { + frameIndex: idx, + sceneDescription: '', + subjects: [], + actions: [], + context: '', + transitions: '', + }; + // Add timestamp context + entry.timestamp = keyframe.timestamp; + return { keyframeId: keyframe.id, output: entry, confidence: 0.8 }; + }); +} + export async function analyzeKeyframesForAsset( assetId: string, analysisType?: string, batchSize?: number, onProgress?: (msg: string) => void, signal?: AbortSignal, + chunkSize?: number, + overlap?: number, ): Promise { const type = analysisType ?? 'scene_description'; const batch = batchSize ?? 10; @@ -91,63 +182,66 @@ export async function analyzeKeyframesForAsset( throw new Error('No Anthropic API key available. Add one in Settings → Integrations.'); } + const effectiveChunkSize = chunkSize ?? 10; + const effectiveOverlap = overlap ?? 2; + let analyzedCount = analyzedKeyframeIds.size; const totalKeyframes = keyframes.length; - onProgress?.(`Analyzing ${pendingKeyframes.length} keyframes (${analyzedKeyframeIds.size} already done)...\n`); + const chunks = buildChunks(pendingKeyframes, effectiveChunkSize, effectiveOverlap); + + onProgress?.(`Analyzing ${pendingKeyframes.length} keyframes in ${chunks.length} chunks (${analyzedKeyframeIds.size} already done)...\n`); let aborted = false; try { - // Process in batches - for (let i = 0; i < pendingKeyframes.length; i += batch) { + for (let chunkIdx = 0; chunkIdx < chunks.length; chunkIdx++) { if (signal?.aborted) { onProgress?.('Aborted.\n'); aborted = true; break; } - const currentBatch = pendingKeyframes.slice(i, i + batch); - const batchResults: Array<{ - assetId: string; - keyframeId: string; - analysisType: string; - output: Record; - confidence?: number; - }> = []; - - for (const keyframe of currentBatch) { - if (signal?.aborted) { - onProgress?.('Aborted.\n'); - aborted = true; - break; - } - - try { - const result = await analyzeKeyframe(provider, keyframe); + const chunk = chunks[chunkIdx]!; + + try { + const chunkResults = await analyzeChunk(provider, chunk); + + const batchResults: Array<{ + assetId: string; + keyframeId: string; + analysisType: string; + output: Record; + confidence?: number; + }> = []; + + for (const result of chunkResults) { + // Overlap dedup: skip keyframes already inserted from a previous chunk + if (analyzedKeyframeIds.has(result.keyframeId)) { + continue; + } + analyzedKeyframeIds.add(result.keyframeId); + analyzedCount++; batchResults.push({ assetId, - keyframeId: keyframe.id, + keyframeId: result.keyframeId, analysisType: type, output: result.output, confidence: result.confidence, }); - analyzedCount++; - } catch (err) { - onProgress?.(` Warning: failed to analyze frame at ${keyframe.timestamp}s: ${(err as Error).message}\n`); } - } - // Batch insert results - if (batchResults.length > 0) { - insertVisionOutputsBatch(batchResults); + if (batchResults.length > 0) { + insertVisionOutputsBatch(batchResults); + } + } catch (err) { + onProgress?.(` Warning: failed to analyze chunk ${chunkIdx + 1}: ${(err as Error).message}\n`); } - // Update progress const progress = Math.round((analyzedCount / totalKeyframes) * 100); updateProcessingStage(stage.id, { progress }); - onProgress?.(` Batch ${Math.floor(i / batch) + 1}: analyzed ${batchResults.length}/${currentBatch.length} frames (${progress}% total)\n`); + onProgress?.(` Chunk ${chunkIdx + 1}/${chunks.length}: ${chunk.length} frames (${progress}% total)\n`); } if (aborted) { @@ -182,6 +276,8 @@ export async function run( const analysisType = (input.analysis_type as string) || 'scene_description'; const batchSize = (input.batch_size as number) || 10; + const chunkSizeInput = (input.chunk_size as number) || 10; + const overlapInput = (input.overlap as number) || 2; try { // Check if all keyframes are already analyzed before calling the core function @@ -203,7 +299,7 @@ export async function run( }; } - await analyzeKeyframesForAsset(assetId, analysisType, batchSize, context.onOutput, context.signal); + await analyzeKeyframesForAsset(assetId, analysisType, batchSize, context.onOutput, context.signal, chunkSizeInput, overlapInput); // Gather final stats const allKeyframes = getKeyframesForAsset(assetId);