diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts index 0d5fc7dec8031..be6a6568eaa9a 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts @@ -260,6 +260,7 @@ export { TaskStatus, type TaskResult } from './src/tasks/types'; export type { GenerateDescriptionResult } from './src/api/description_generation'; export type { IdentifyFeaturesResult, IterationResult } from './src/api/features'; +export { tokenCountSchema, iterationResultSchema } from './src/api/features'; export { type GenerateInsightsResult, diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/api/features/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/api/features/index.ts index e44492b0a9a75..7d00d3fb44f01 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/api/features/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/api/features/index.ts @@ -5,17 +5,34 @@ * 2.0. */ +import { z } from '@kbn/zod/v4'; import type { ChatCompletionTokenCount } from '@kbn/inference-common'; import type { BaseFeature } from '../../feature'; -export interface IterationResult { - iteration: number; - durationMs: number; - state: 'success' | 'failure'; - tokensUsed: ChatCompletionTokenCount; - newFeatures: Array<{ id: string; title: string }>; - updatedFeatures: Array<{ id: string; title: string }>; -} +export const tokenCountSchema = z.object({ + prompt: z.number(), + completion: z.number(), + thinking: z.number().optional(), + total: z.number(), + cached: z.number().optional(), +}); + +const featureSummarySchema = z.object({ + id: z.string(), + title: z.string(), +}); + +export const iterationResultSchema = z.object({ + runId: z.string(), + iteration: z.number(), + durationMs: z.number(), + state: z.enum(['success', 'failure']), + tokensUsed: tokenCountSchema, + newFeatures: z.array(featureSummarySchema), + updatedFeatures: z.array(featureSummarySchema), +}); + +export type IterationResult = z.infer; export interface IdentifyFeaturesResult { features: BaseFeature[]; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/feature.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/feature.ts index 006798882a64a..e44c91550c45d 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/feature.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/feature.ts @@ -73,6 +73,7 @@ export const featureSchema = baseFeatureSchema.and( last_seen: z.string(), expires_at: z.string().optional(), excluded_at: z.string().optional(), + run_id: z.string().optional(), }) ); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_computed_features.ts b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_computed_features.ts new file mode 100644 index 0000000000000..0cc648bb4ef9f --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_computed_features.ts @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { Logger } from '@kbn/logging'; +import type { Feature, Streams } from '@kbn/streams-schema'; +import { generateAllComputedFeatures } from '@kbn/streams-ai'; +import type { FeatureClient } from '../../streams/feature/feature_client'; +import { reconcileComputedFeatures } from './reconcile_features'; + +export interface IdentifyComputedFeaturesOptions { + stream: Streams.all.Definition; + streamName: string; + start: number; + end: number; + esClient: ElasticsearchClient; + featureClient: FeatureClient; + logger: Logger; + featureTtlDays?: number; + runId: string; +} + +export async function identifyComputedFeatures({ + stream, + streamName, + start, + end, + esClient, + featureClient, + logger, + featureTtlDays, + runId, +}: IdentifyComputedFeaturesOptions): Promise { + const computedFeatures = await generateAllComputedFeatures({ + stream, + start, + end, + esClient, + logger: logger.get('computed_features'), + }); + + const reconciledComputedFeatures = reconcileComputedFeatures({ + computedFeatures, + streamName, + featureTtlDays, + runId, + }); + + if (reconciledComputedFeatures.length > 0) { + await featureClient.bulk( + streamName, + reconciledComputedFeatures.map((feature) => ({ index: { feature } })) + ); + } + + return reconciledComputedFeatures; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_inferred_features.ts b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_inferred_features.ts new file mode 100644 index 0000000000000..0255abeb496f1 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/identify_inferred_features.ts @@ -0,0 +1,521 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient } from '@kbn/core/server'; +import type { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server'; +import type { Logger } from '@kbn/logging'; +import type { BoundInferenceClient, ChatCompletionTokenCount } from '@kbn/inference-common'; +import type { StreamType } from '@kbn/streams-schema'; +import { + type Feature, + type BaseFeature, + type IterationResult, + isComputedFeature, + isFeatureWithFilter, +} from '@kbn/streams-schema'; +import { + identifyFeatures, + type ExcludedFeatureSummary, + type IgnoredFeature, +} from '@kbn/streams-ai'; +import type { FeatureClient } from '../../streams/feature/feature_client'; +import { fetchSampleDocuments } from '../../tasks/task_definitions/features_identification/fetch_sample_documents'; +import { PromptsConfigService } from '../saved_objects/prompts_config_service'; +import type { SigEventsTuningConfig } from '../../../../common/sig_events_tuning_config'; +import { DEFAULT_SIG_EVENTS_TUNING_CONFIG } from '../../../../common/sig_events_tuning_config'; +import { EMPTY_TOKENS } from './iteration_state'; +import { + reconcileInferredFeatures, + toFeatureSummary, + toFeatureProjection, +} from './reconcile_features'; + +const DEFAULT_MAX_PREVIOUSLY_IDENTIFIED_FEATURES = 100; + +// --------------------------------------------------------------------------- +// Tuning params type (subset of SigEventsTuningConfig) +// --------------------------------------------------------------------------- + +type IterationTuningParams = Partial< + Pick< + SigEventsTuningConfig, + | 'sample_size' + | 'feature_ttl_days' + | 'entity_filtered_ratio' + | 'diverse_ratio' + | 'max_excluded_features_in_prompt' + | 'max_entity_filters' + > +> & { + maxPreviouslyIdentifiedFeatures?: number; +}; + +// --------------------------------------------------------------------------- +// Telemetry +// --------------------------------------------------------------------------- + +export interface FeaturesIdentifiedTelemetry { + run_id: string; + iteration: number; + stream_name: string; + stream_type: StreamType; + docs_count: number; + excluded_features_count: number; + total_filters: number; + filters_capped: boolean; + has_filtered_documents: boolean; + duration_ms: number; + state: 'success' | 'failure' | 'canceled'; + features_new: number; + features_updated: number; + input_tokens_used: number; + output_tokens_used: number; + total_tokens_used: number; + cached_tokens_used: number; + llm_ignored_count: number; + code_ignored_count: number; +} + +export interface TelemetryContext { + run_id: string; + iteration: number; + stream_name: string; + stream_type: StreamType; + docs_count: number; + excluded_features_count: number; + total_filters: number; + filters_capped: boolean; + has_filtered_documents: boolean; +} + +export function buildTelemetry( + ctx: TelemetryContext, + durationMs: number, + outcome: + | { state: 'failure' | 'canceled' } + | { + state: 'success'; + tokensUsed: ChatCompletionTokenCount; + newCount: number; + updatedCount: number; + llmIgnoredCount: number; + codeIgnoredCount: number; + } +): FeaturesIdentifiedTelemetry { + if (outcome.state !== 'success') { + return { + ...ctx, + duration_ms: durationMs, + state: outcome.state, + features_new: 0, + features_updated: 0, + input_tokens_used: 0, + output_tokens_used: 0, + total_tokens_used: 0, + cached_tokens_used: 0, + llm_ignored_count: 0, + code_ignored_count: 0, + }; + } + const { tokensUsed } = outcome; + return { + ...ctx, + duration_ms: durationMs, + state: 'success', + features_new: outcome.newCount, + features_updated: outcome.updatedCount, + input_tokens_used: tokensUsed.prompt, + output_tokens_used: tokensUsed.completion, + total_tokens_used: tokensUsed.total, + cached_tokens_used: tokensUsed.cached ?? 0, + llm_ignored_count: outcome.llmIgnoredCount, + code_ignored_count: outcome.codeIgnoredCount, + }; +} + +// --------------------------------------------------------------------------- +// LLM inference wrapper +// --------------------------------------------------------------------------- + +type InferenceResult = + | { + success: true; + rawFeatures: BaseFeature[]; + ignoredFeatures: IgnoredFeature[]; + tokensUsed: ChatCompletionTokenCount; + } + | { success: false }; + +async function tryIdentifyFeatures( + args: Parameters[0] +): Promise { + try { + const result = await identifyFeatures(args); + return { + success: true, + rawFeatures: result.features, + ignoredFeatures: result.ignoredFeatures, + tokensUsed: result.tokensUsed, + }; + } catch (error) { + if (args.signal.aborted) { + throw error; + } + const errorMsg = error instanceof Error ? error.message : String(error); + args.logger.warn(`LLM inference failed: ${errorMsg}`); + return { success: false }; + } +} + +// --------------------------------------------------------------------------- +// Single inferred-features iteration (internal) +// --------------------------------------------------------------------------- + +interface RunInferredIterationOptions { + esClient: ElasticsearchClient; + streamName: string; + start: number; + end: number; + runId: string; + allFeatures: Feature[]; + discoveredFeatures: Feature[]; + excludedFeatures: Feature[]; + inferenceClient: BoundInferenceClient; + systemPrompt: string; + logger: Logger; + signal: AbortSignal; + tuning: IterationTuningParams; + diverseOffset: number; +} + +type InferredIterationResult = + | { hasDocuments: false; nextDiverseOffset: number } + | { + hasDocuments: true; + docsCount: number; + docIds: string[]; + totalFilters: number; + filtersCapped: boolean; + hasFilteredDocuments: boolean; + nextDiverseOffset: number; + outcome: + | { state: 'failure' } + | { + state: 'success'; + tokensUsed: ChatCompletionTokenCount; + newFeatures: Feature[]; + updatedFeatures: Feature[]; + ignoredFeatures: IgnoredFeature[]; + codeIgnoredCount: number; + }; + }; + +async function runInferredIteration({ + esClient, + streamName, + start, + end, + runId, + allFeatures, + discoveredFeatures, + excludedFeatures, + inferenceClient, + systemPrompt, + logger, + signal, + tuning, + diverseOffset, +}: RunInferredIterationOptions): Promise { + const { + sample_size: sampleSize = DEFAULT_SIG_EVENTS_TUNING_CONFIG.sample_size, + entity_filtered_ratio: + entityFilteredRatio = DEFAULT_SIG_EVENTS_TUNING_CONFIG.entity_filtered_ratio, + diverse_ratio: diverseRatio = DEFAULT_SIG_EVENTS_TUNING_CONFIG.diverse_ratio, + max_entity_filters: maxEntityFilters = DEFAULT_SIG_EVENTS_TUNING_CONFIG.max_entity_filters, + max_excluded_features_in_prompt: + maxExcludedFeaturesInPrompt = DEFAULT_SIG_EVENTS_TUNING_CONFIG.max_excluded_features_in_prompt, + feature_ttl_days: featureTtlDays, + maxPreviouslyIdentifiedFeatures = DEFAULT_MAX_PREVIOUSLY_IDENTIFIED_FEATURES, + } = tuning; + + const batchResult = await fetchSampleDocuments({ + esClient, + index: streamName, + start, + end, + features: discoveredFeatures.filter(isFeatureWithFilter), + logger, + size: sampleSize, + entityFilteredRatio, + diverseRatio, + maxEntityFilters, + diverseOffset, + }); + + if (batchResult.documents.length === 0) { + return { hasDocuments: false, nextDiverseOffset: batchResult.nextOffset }; + } + + const { totalFilters, filtersCapped, hasFilteredDocuments } = batchResult; + const docsCount = batchResult.documents.length; + const docIds = batchResult.documents + .map((doc) => doc._id) + .filter((id): id is string => id != null); + + const allKnownFeatures = allFeatures.filter((f) => !isComputedFeature(f)); + const topRanked = [...allKnownFeatures] + .sort((a, b) => { + const aEntity = a.type === 'entity' ? 0 : 1; + const bEntity = b.type === 'entity' ? 0 : 1; + if (aEntity !== bEntity) return aEntity - bEntity; + return b.confidence - a.confidence; + }) + .slice(0, maxPreviouslyIdentifiedFeatures); + + const excludedSummaries: ExcludedFeatureSummary[] = excludedFeatures + .slice(0, maxExcludedFeaturesInPrompt) + .map(toFeatureProjection); + + const inferResult = await tryIdentifyFeatures({ + streamName, + sampleDocuments: batchResult.documents, + excludedFeatures: excludedSummaries, + inferenceClient, + systemPrompt, + logger, + signal, + previouslyIdentifiedFeatures: topRanked.map(toFeatureProjection), + }); + + if (!inferResult.success) { + return { + hasDocuments: true, + docsCount, + docIds, + totalFilters, + filtersCapped, + hasFilteredDocuments, + nextDiverseOffset: batchResult.nextOffset, + outcome: { state: 'failure' }, + }; + } + + const { rawFeatures, ignoredFeatures, tokensUsed } = inferResult; + + const { newFeatures, updatedFeatures, codeIgnoredCount } = reconcileInferredFeatures({ + rawFeatures, + allKnownFeatures, + discoveredFeatures, + ignoredFeatures, + excludedFeatures, + featureTtlDays, + runId, + logger, + }); + + return { + hasDocuments: true, + docsCount, + docIds, + totalFilters, + filtersCapped, + hasFilteredDocuments, + nextDiverseOffset: batchResult.nextOffset, + outcome: { + state: 'success', + tokensUsed, + newFeatures, + updatedFeatures, + ignoredFeatures, + codeIgnoredCount, + }, + }; +} + +// --------------------------------------------------------------------------- +// Top-level: Identify inferred features (one iteration, full handler) +// --------------------------------------------------------------------------- + +export interface IdentifyInferredFeaturesOptions { + esClient: ElasticsearchClient; + featureClient: FeatureClient; + soClient: SavedObjectsClientContract; + inferenceClient: BoundInferenceClient; + logger: Logger; + signal: AbortSignal; + streamName: string; + streamType: StreamType; + start: number; + end: number; + runId: string; + iteration?: number; + tuning?: IterationTuningParams; + diverseOffset?: number; + trackFeaturesIdentified?: (data: FeaturesIdentifiedTelemetry) => void; +} + +export interface IdentifyInferredFeaturesResult { + hasDocuments: boolean; + docsCount: number; + docIds: string[]; + discoveredFeatures: Feature[]; + iterationResult: IterationResult; + nextDiverseOffset: number; +} + +export async function identifyInferredFeatures({ + esClient, + featureClient, + soClient, + inferenceClient, + logger, + signal, + streamName, + streamType, + start, + end, + runId, + iteration = 1, + tuning = {}, + diverseOffset = 0, + trackFeaturesIdentified, +}: IdentifyInferredFeaturesOptions): Promise { + const [ + { hits: allFeatures }, + { hits: excludedFeatures }, + { featurePromptOverride: systemPrompt }, + ] = await Promise.all([ + featureClient.getFeatures(streamName), + featureClient.getExcludedFeatures(streamName), + new PromptsConfigService({ soClient, logger }).getPrompt(), + ]); + + const discoveredFeatures = allFeatures.filter((f) => !isComputedFeature(f) && f.run_id === runId); + + const startedAt = Date.now(); + + const iterationResult = await runInferredIteration({ + esClient, + streamName, + start, + end, + runId, + allFeatures, + discoveredFeatures, + excludedFeatures, + inferenceClient, + systemPrompt, + logger, + signal, + tuning, + diverseOffset, + }); + + if (!iterationResult.hasDocuments) { + return { + hasDocuments: false, + docsCount: 0, + docIds: [], + discoveredFeatures, + iterationResult: { + runId, + iteration, + durationMs: Date.now() - startedAt, + state: 'success', + tokensUsed: { ...EMPTY_TOKENS }, + newFeatures: [], + updatedFeatures: [], + }, + nextDiverseOffset: iterationResult.nextDiverseOffset, + }; + } + + const { docsCount, docIds, totalFilters, filtersCapped, hasFilteredDocuments, outcome } = + iterationResult; + + const durationMs = Date.now() - startedAt; + + const telemetryCtx: TelemetryContext = { + run_id: runId, + iteration, + stream_name: streamName, + stream_type: streamType, + docs_count: docsCount, + excluded_features_count: excludedFeatures.length, + total_filters: totalFilters, + filters_capped: filtersCapped, + has_filtered_documents: hasFilteredDocuments, + }; + + if (outcome.state !== 'success') { + const failedEntry: IterationResult = { + runId, + iteration, + durationMs, + state: 'failure', + tokensUsed: { ...EMPTY_TOKENS }, + newFeatures: [], + updatedFeatures: [], + }; + + trackFeaturesIdentified?.(buildTelemetry(telemetryCtx, durationMs, { state: 'failure' })); + + return { + hasDocuments: true, + docsCount, + docIds, + discoveredFeatures, + iterationResult: failedEntry, + nextDiverseOffset: iterationResult.nextDiverseOffset, + }; + } + + const { tokensUsed, newFeatures, updatedFeatures, ignoredFeatures, codeIgnoredCount } = outcome; + + const allChanged = [...newFeatures, ...updatedFeatures]; + if (allChanged.length > 0) { + await featureClient.bulk( + streamName, + allChanged.map((feature) => ({ index: { feature } })) + ); + } + + const discoveredMap = new Map(discoveredFeatures.map((f) => [f.uuid, f])); + for (const feature of allChanged) { + discoveredMap.set(feature.uuid, feature); + } + + const iterationEntry: IterationResult = { + runId, + iteration, + durationMs, + state: 'success', + tokensUsed, + newFeatures: newFeatures.map(toFeatureSummary), + updatedFeatures: updatedFeatures.map(toFeatureSummary), + }; + + trackFeaturesIdentified?.( + buildTelemetry(telemetryCtx, durationMs, { + state: 'success', + tokensUsed, + newCount: newFeatures.length, + updatedCount: updatedFeatures.length, + llmIgnoredCount: ignoredFeatures.length, + codeIgnoredCount, + }) + ); + + return { + hasDocuments: true, + docsCount, + docIds, + discoveredFeatures: Array.from(discoveredMap.values()), + iterationResult: iterationEntry, + nextDiverseOffset: iterationResult.nextDiverseOffset, + }; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/index.ts b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/index.ts new file mode 100644 index 0000000000000..f3324dec0bc68 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/index.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export { MS_PER_DAY, EMPTY_TOKENS } from './iteration_state'; +export { deriveSuccessCount, deriveTotalTokensUsed } from './iteration_state'; + +export { identifyInferredFeatures, buildTelemetry } from './identify_inferred_features'; +export type { + FeaturesIdentifiedTelemetry, + TelemetryContext, + IdentifyInferredFeaturesOptions, + IdentifyInferredFeaturesResult, +} from './identify_inferred_features'; + +export { identifyComputedFeatures } from './identify_computed_features'; +export type { IdentifyComputedFeaturesOptions } from './identify_computed_features'; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/iteration_state.ts b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/iteration_state.ts new file mode 100644 index 0000000000000..66331568ab143 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/iteration_state.ts @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ChatCompletionTokenCount } from '@kbn/inference-common'; +import type { Feature, IterationResult } from '@kbn/streams-schema'; +import { sumTokens } from '@kbn/streams-ai'; + +export const MS_PER_DAY = 24 * 60 * 60 * 1000; + +export const EMPTY_TOKENS: ChatCompletionTokenCount = { + prompt: 0, + completion: 0, + total: 0, + cached: 0, +}; + +export interface AccumulatedIterationState { + discoveredFeatures: Feature[]; + iterationResults: IterationResult[]; +} + +export function createEmptyAccumulatedState(): AccumulatedIterationState { + return { + discoveredFeatures: [], + iterationResults: [], + }; +} + +export function deriveSuccessCount(results: IterationResult[]): number { + return results.filter((r) => r.state === 'success').length; +} + +export function deriveTotalTokensUsed(results: IterationResult[]): ChatCompletionTokenCount { + return results.reduce((acc, r) => sumTokens(acc, r.tokensUsed), { ...EMPTY_TOKENS }); +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/reconcile_features.ts b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/reconcile_features.ts new file mode 100644 index 0000000000000..bc3f8ede40ee0 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/sig_events/features/reconcile_features.ts @@ -0,0 +1,164 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isEqual } from 'lodash'; +import { v4 as uuid, v5 as uuidv5 } from 'uuid'; +import type { Logger } from '@kbn/logging'; +import { + type Feature, + type BaseFeature, + isDuplicateFeature, + hasSameFingerprint, + mergeFeature, + toBaseFeature, +} from '@kbn/streams-schema'; +import type { IgnoredFeature } from '@kbn/streams-ai'; +import { DEFAULT_SIG_EVENTS_TUNING_CONFIG } from '../../../../common/sig_events_tuning_config'; +import { MS_PER_DAY } from './iteration_state'; + +export const toFeatureSummary = ({ id, title }: Feature) => ({ id, title: title ?? id }); + +export const toFeatureProjection = ({ + id, + type, + subtype, + title, + description, + properties, +}: Feature) => ({ + id, + type, + subtype, + title, + description, + properties, +}); + +export function createFeatureMetadata({ + featureTtlDays = DEFAULT_SIG_EVENTS_TUNING_CONFIG.feature_ttl_days, + runId, +}: { + featureTtlDays?: number; + runId: string; +}) { + const now = Date.now(); + return { + status: 'active' as const, + last_seen: new Date(now).toISOString(), + expires_at: new Date(now + featureTtlDays * MS_PER_DAY).toISOString(), + run_id: runId, + }; +} + +export function reconcileComputedFeatures({ + computedFeatures, + streamName, + featureTtlDays, + runId, +}: { + computedFeatures: BaseFeature[]; + streamName: string; + featureTtlDays?: number; + runId: string; +}): Feature[] { + const metadata = createFeatureMetadata({ featureTtlDays, runId }); + return computedFeatures.map((feature) => ({ + ...feature, + ...metadata, + uuid: uuidv5(`${streamName}:${feature.id}`, uuidv5.DNS), + })); +} + +function filterExcluded( + rawFeatures: ReadonlyArray, + excludedFeatures: ReadonlyArray, + logger: Logger +): { nonExcluded: BaseFeature[]; codeIgnoredCount: number } { + const excludedByLowerId = new Set(excludedFeatures.map((f) => f.id.toLowerCase())); + let codeIgnoredCount = 0; + + const nonExcluded = rawFeatures.filter((feature) => { + const lowerId = feature.id.toLowerCase(); + if (excludedByLowerId.has(lowerId)) { + codeIgnoredCount++; + logger.debug(`Dropping inferred feature [${feature.id}] matches excluded feature by ID`); + return false; + } + const fingerprintMatch = excludedFeatures.find((excluded) => + hasSameFingerprint(feature, excluded) + ); + if (fingerprintMatch) { + codeIgnoredCount++; + logger.debug( + `Dropping inferred feature [${feature.id}] because it matches excluded feature [${fingerprintMatch.id}] by fingerprint` + ); + return false; + } + return true; + }); + + return { nonExcluded, codeIgnoredCount }; +} + +export function reconcileInferredFeatures({ + rawFeatures, + allKnownFeatures, + discoveredFeatures, + ignoredFeatures, + excludedFeatures, + featureTtlDays, + runId, + logger, +}: { + rawFeatures: BaseFeature[]; + allKnownFeatures: Feature[]; + discoveredFeatures: ReadonlyArray; + ignoredFeatures: IgnoredFeature[]; + excludedFeatures: ReadonlyArray; + featureTtlDays?: number; + runId: string; + logger: Logger; +}): { newFeatures: Feature[]; updatedFeatures: Feature[]; codeIgnoredCount: number } { + const metadata = createFeatureMetadata({ featureTtlDays, runId }); + const newFeatures: Feature[] = []; + const updatedFeatures: Feature[] = []; + + for (const ignored of ignoredFeatures) { + logger.debug( + `LLM ignored feature "${ignored.feature_id}" (matched excluded "${ignored.excluded_feature_id}"): ${ignored.reason}` + ); + } + + const { nonExcluded, codeIgnoredCount } = filterExcluded(rawFeatures, excludedFeatures, logger); + + const discoveredSet = new Set(discoveredFeatures.map((f) => f.uuid)); + const byLowerId = new Map(); + for (const f of allKnownFeatures) { + byLowerId.set(f.id.toLowerCase(), f); + } + + for (const raw of nonExcluded) { + const match = + byLowerId.get(raw.id.toLowerCase()) ?? + allKnownFeatures.find((f) => isDuplicateFeature(f, raw)); + + if (match) { + if (!discoveredSet.has(match.uuid)) { + updatedFeatures.push({ ...raw, ...metadata, uuid: match.uuid }); + } else { + const merged = mergeFeature(match, raw); + if (!isEqual(merged, toBaseFeature(match))) { + updatedFeatures.push({ ...merged, ...metadata, uuid: match.uuid }); + } + } + } else { + newFeatures.push({ ...raw, ...metadata, uuid: uuid() }); + } + } + + return { newFeatures, updatedFeatures, codeIgnoredCount }; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_client.ts index b09b4d01db1dd..e5cc3103a7d80 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_client.ts @@ -32,6 +32,7 @@ import { FEATURE_EXCLUDED_AT, FEATURE_FILTER, FEATURE_EVIDENCE_DOC_IDS, + FEATURE_RUN_ID, FEATURE_SEARCH_EMBEDDING, } from './fields'; import type { FeatureStorageSettings } from './storage_settings'; @@ -589,6 +590,7 @@ function toStorage(stream: string, feature: Feature, inferenceAvailable: boolean [FEATURE_META]: feature.meta, [FEATURE_EXPIRES_AT]: feature.expires_at, [FEATURE_EXCLUDED_AT]: feature.excluded_at, + [FEATURE_RUN_ID]: feature.run_id, [FEATURE_TITLE]: feature.title, [FEATURE_FILTER]: feature.filter, ...(inferenceAvailable && embeddingText ? { [FEATURE_SEARCH_EMBEDDING]: embeddingText } : {}), @@ -613,6 +615,7 @@ function fromStorage(feature: StoredFeature): Feature { meta: feature[FEATURE_META], expires_at: feature[FEATURE_EXPIRES_AT], excluded_at: feature[FEATURE_EXCLUDED_AT], + run_id: feature[FEATURE_RUN_ID], title: feature[FEATURE_TITLE], filter: feature[FEATURE_FILTER], }; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/fields.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/fields.ts index 3f58d4d7f515b..bbbb30db0ac48 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/fields.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/fields.ts @@ -24,4 +24,5 @@ export const FEATURE_META = 'feature.meta'; export const FEATURE_EXPIRES_AT = 'feature.expires_at'; export const FEATURE_FILTER = 'feature.filter'; export const FEATURE_EXCLUDED_AT = 'feature.excluded_at'; +export const FEATURE_RUN_ID = 'feature.run_id'; export const FEATURE_SEARCH_EMBEDDING = 'feature.search_embedding'; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/storage_settings.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/storage_settings.ts index 163742db41b04..7b5e77f2e98c7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/storage_settings.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/storage_settings.ts @@ -26,6 +26,7 @@ import { FEATURE_ID, FEATURE_FILTER, FEATURE_EVIDENCE_DOC_IDS, + FEATURE_RUN_ID, FEATURE_SEARCH_EMBEDDING, } from './fields'; @@ -50,6 +51,7 @@ export const featureStorageSettings = { [FEATURE_META]: types.object({ enabled: false }), [FEATURE_EXPIRES_AT]: types.date(), [FEATURE_EXCLUDED_AT]: types.date(), + [FEATURE_RUN_ID]: types.keyword(), [FEATURE_FILTER]: types.object({ enabled: false }), [FEATURE_SEARCH_EMBEDDING]: types.semantic_text(), }, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/stored_feature.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/stored_feature.ts index 82dac1ce8be95..bccddf57e773a 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/stored_feature.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/stored_feature.ts @@ -27,6 +27,7 @@ import { FEATURE_PROPERTIES, FEATURE_SUBTYPE, FEATURE_FILTER, + FEATURE_RUN_ID, FEATURE_SEARCH_EMBEDDING, } from './fields'; @@ -49,6 +50,7 @@ export const storedFeatureSchema = z.object({ [FEATURE_EXCLUDED_AT]: z.string().optional(), [FEATURE_TITLE]: z.string().optional(), [FEATURE_FILTER]: conditionSchema.optional(), + [FEATURE_RUN_ID]: z.string().optional(), [FEATURE_SEARCH_EMBEDDING]: z.string().optional(), }); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/fetch_sample_documents.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/fetch_sample_documents.ts index 3f45b63d9823f..c59cd6045cc36 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/fetch_sample_documents.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/fetch_sample_documents.ts @@ -44,6 +44,17 @@ export async function fetchSampleDocuments({ diverseOffset?: number; maxEntityFilters: number; }) { + if (entityFilteredRatio < 0 || diverseRatio < 0) { + throw new Error( + `entityFilteredRatio (${entityFilteredRatio}) and diverseRatio (${diverseRatio}) must be >= 0` + ); + } + if (entityFilteredRatio + diverseRatio > 1) { + throw new Error( + `entityFilteredRatio (${entityFilteredRatio}) + diverseRatio (${diverseRatio}) must be <= 1` + ); + } + const entityFilters = getEntityFilters(features, maxEntityFilters); if (entityFilters.length === 0) { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/index.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/index.ts index e72dc19e1b973..77a1062f2119d 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/features_identification/index.ts @@ -5,725 +5,292 @@ * 2.0. */ -import { isEqual } from 'lodash'; import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; -import type { ElasticsearchClient } from '@kbn/core/server'; -import { - isInferenceProviderError, - type BoundInferenceClient, - type ChatCompletionTokenCount, -} from '@kbn/inference-common'; +import { isInferenceProviderError, type InferenceConnector } from '@kbn/inference-common'; import { type IdentifyFeaturesResult, type IterationResult, - type BaseFeature, type Feature, - isComputedFeature, - isDuplicateFeature, - mergeFeature, - toBaseFeature, getStreamTypeFromDefinition, - isFeatureWithFilter, } from '@kbn/streams-schema'; -import { - identifyFeatures, - generateAllComputedFeatures, - sumTokens, - type ExcludedFeatureSummary, - type IgnoredFeature, -} from '@kbn/streams-ai'; -import { v4 as uuid, v5 as uuidv5 } from 'uuid'; +import { v4 as uuid } from 'uuid'; import { getDeleteTaskRunResult } from '@kbn/task-manager-plugin/server/task'; import type { Logger, LogMeta } from '@kbn/logging'; import { STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID } from '@kbn/streams-schema'; import { parseError } from '../../../streams/errors/parse_error'; -import { fetchSampleDocuments } from './fetch_sample_documents'; import { formatInferenceProviderError } from '../../../../routes/utils/create_connector_sse_error'; import { resolveConnectorForFeature } from '../../../../routes/utils/resolve_connector_for_feature'; import type { TaskContext } from '..'; import type { TaskParams } from '../../types'; -import { PromptsConfigService } from '../../../sig_events/saved_objects/prompts_config_service'; import { cancellableTask } from '../../cancellable_task'; import { isDefinitionNotFoundError } from '../../../streams/errors/definition_not_found_error'; -import { DEFAULT_SIG_EVENTS_TUNING_CONFIG } from '../../../../../common/sig_events_tuning_config'; +import { + buildTelemetry, + deriveSuccessCount, + deriveTotalTokensUsed, + identifyInferredFeatures, + identifyComputedFeatures, +} from '../../../sig_events/features'; -const toFeatureSummary = ({ id, title }: Feature) => ({ id, title: title ?? id }); +export interface FeaturesIdentificationTaskParams { + start: number; + end: number; + streamName: string; + connectorId?: string; +} -const EMPTY_TOKENS: ChatCompletionTokenCount = { prompt: 0, completion: 0, total: 0, cached: 0 }; -const MAX_PREVIOUSLY_IDENTIFIED_FEATURES = 100; +export const FEATURES_IDENTIFICATION_TASK_TYPE = 'streams_features_identification'; -class FeatureAccumulator { - private readonly byUuid = new Map(); - private readonly byLowerId = new Map(); - private readonly fromStorage = new Set(); +export function getFeaturesIdentificationTaskId(streamName: string) { + return `${FEATURES_IDENTIFICATION_TASK_TYPE}_${streamName}`; +} - constructor(initialFeatures: Feature[] = []) { - for (const f of initialFeatures) { - this.add(f); - this.fromStorage.add(f.uuid); - } - } +function isCancellationError(message: string): boolean { + return message.includes('ERR_CANCELED') || message.includes('Request was aborted'); +} - add(feature: Feature) { - this.byUuid.set(feature.uuid, feature); - this.byLowerId.set(feature.id.toLowerCase(), feature); - } +function buildTaskResult(iterationResults: IterationResult[], durationMs: number) { + return { + durationMs, + iterations: iterationResults, + totalTokensUsed: deriveTotalTokensUsed(iterationResults), + }; +} - update(feature: Feature) { - if (!this.byUuid.has(feature.uuid)) { - return; - } - this.byUuid.set(feature.uuid, feature); - this.byLowerId.set(feature.id.toLowerCase(), feature); +async function runFeaturesIdentification( + taskContext: TaskContext, + runContext: Parameters[1] +) { + if (!runContext.fakeRequest) { + throw new Error('Request is required to run this task'); } - - findDuplicate(candidate: BaseFeature): Feature | undefined { - return ( - this.byLowerId.get(candidate.id.toLowerCase()) ?? - this.getAll().find((f) => isDuplicateFeature(f, candidate)) + const { fakeRequest } = runContext; + + const { + start, + end, + streamName, + connectorId: connectorIdOverride, + _task, + } = runContext.taskInstance.params as TaskParams; + const taskParams = { start, end, streamName, connectorId: connectorIdOverride }; + + const taskDurationMs = () => Date.now() - new Date(_task.created_at).getTime(); + + const runId = uuid(); + const emptyTelemetryCtx = { + run_id: runId, + iteration: 0, + stream_name: streamName, + stream_type: 'unknown' as const, + docs_count: 0, + excluded_features_count: 0, + total_filters: 0, + filters_capped: false, + has_filtered_documents: false, + }; + const trackEmptyTelemetry = (telemetryState: 'canceled' | 'failure') => { + taskContext.telemetry.trackFeaturesIdentified( + buildTelemetry(emptyTelemetryCtx, 0, { state: telemetryState }) ); - } - - isStoredFeature(feature: Feature): boolean { - return this.fromStorage.has(feature.uuid); - } - - promoteFromStorage(featureUuid: string) { - this.fromStorage.delete(featureUuid); - } - - getAll(): Feature[] { - return Array.from(this.byUuid.values()); - } - - getDiscovered(): Feature[] { - return this.getAll().filter((f) => !this.fromStorage.has(f.uuid)); - } - - getTopRanked(limit: number): Feature[] { - return this.getAll() - .sort((a, b) => { - const aEntity = a.type === 'entity' ? 0 : 1; - const bEntity = b.type === 'entity' ? 0 : 1; - if (aEntity !== bEntity) return aEntity - bEntity; - return b.confidence - a.confidence; - }) - .slice(0, limit); - } - - public get length(): number { - return this.byUuid.size; - } -} - -export interface IterationTelemetry { - iteration: number; - state: 'success' | 'failure'; - docsCount: number; - featuresNew: number; - featuresUpdated: number; - durationMs: number; - tokensUsed: ChatCompletionTokenCount; - ignoredFeaturesCount: number; - codeIgnoredCount: number; - totalFilters: number; - filtersCapped: boolean; - hasFilteredDocuments: boolean; -} + }; -export interface IdentifyStreamFeaturesOptions { - streamName: string; - esClient: ElasticsearchClient; - start: number; - end: number; - existingFeatures: Feature[]; - inferenceClient: BoundInferenceClient; - systemPrompt: string; - logger: Logger; - signal: AbortSignal; - maxIterations?: number; - maxExcludedFeaturesForPrompt?: number; - sampleSize?: number; - entityFilteredRatio?: number; - diverseRatio?: number; - maxEntityFilters?: number; - featureTtlDays?: number; - onIterationComplete?: ( - telemetry: IterationTelemetry, - changes: { newFeatures: Feature[]; updatedFeatures: Feature[] } - ) => Promise; - excludedFeatures: Feature[]; -} + const { + taskClient, + scopedClusterClient, + getFeatureClient, + streamsClient, + inferenceClient, + soClient, + tuningConfig, + } = await taskContext.getScopedClients({ request: fakeRequest }); + + const taskLogger = taskContext.logger.get('features_identification', streamName); + + const [featureClient, connectorId] = await Promise.all([ + getFeatureClient(), + connectorIdOverride + ? Promise.resolve(connectorIdOverride) + : resolveConnectorForFeature({ + searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints, + featureId: STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID, + featureName: 'knowledge indicator extraction', + request: fakeRequest, + }), + ]); + taskLogger.debug(`Using connector ${connectorId} for knowledge indicator extraction`); + + let hasTrackedIteration = false; + const iterationResults: IterationResult[] = []; + let discoveredFeatures: Feature[] = []; + + try { + const stream = await streamsClient.getStream(streamName); + + const streamType = getStreamTypeFromDefinition(stream); + const boundInferenceClient = inferenceClient.bindTo({ connectorId }); + const esClient = scopedClusterClient.asCurrentUser; + + const trackFeaturesIdentified = ( + data: Parameters[0] + ) => { + hasTrackedIteration = true; + taskContext.telemetry.trackFeaturesIdentified(data); + }; -export async function identifyStreamFeatures({ - streamName, - esClient, - start, - end, - existingFeatures, - inferenceClient, - systemPrompt, - logger, - signal, - maxIterations = DEFAULT_SIG_EVENTS_TUNING_CONFIG.max_iterations, - maxExcludedFeaturesForPrompt = DEFAULT_SIG_EVENTS_TUNING_CONFIG.max_excluded_features_in_prompt, - sampleSize = DEFAULT_SIG_EVENTS_TUNING_CONFIG.sample_size, - entityFilteredRatio = DEFAULT_SIG_EVENTS_TUNING_CONFIG.entity_filtered_ratio, - diverseRatio = DEFAULT_SIG_EVENTS_TUNING_CONFIG.diverse_ratio, - maxEntityFilters = DEFAULT_SIG_EVENTS_TUNING_CONFIG.max_entity_filters, - featureTtlDays = DEFAULT_SIG_EVENTS_TUNING_CONFIG.feature_ttl_days, - onIterationComplete, - excludedFeatures, -}: IdentifyStreamFeaturesOptions): Promise<{ - features: Feature[]; - tokensUsed: ChatCompletionTokenCount; -}> { - const excludedSummaries: ExcludedFeatureSummary[] = excludedFeatures - .slice(0, maxExcludedFeaturesForPrompt) - .map(({ id, type, subtype, title, description, properties }) => ({ - id, - type, - subtype, - title, - description, - properties, - })); - - const known = new FeatureAccumulator(existingFeatures); - - let totalTokensUsed: ChatCompletionTokenCount = { ...EMPTY_TOKENS }; - - let successCount = 0; - let failureCount = 0; - let diverseOffset = 0; - - for (let i = 0; i < maxIterations; i++) { - if (signal.aborted) { - logger.debug('Feature identification aborted'); - throw new Error('Request was aborted'); - } + const { max_iterations: maxIterations } = tuningConfig; + let tuning = { + sample_size: tuningConfig.sample_size, + feature_ttl_days: tuningConfig.feature_ttl_days, + entity_filtered_ratio: tuningConfig.entity_filtered_ratio, + diverse_ratio: tuningConfig.diverse_ratio, + max_excluded_features_in_prompt: tuningConfig.max_excluded_features_in_prompt, + max_entity_filters: tuningConfig.max_entity_filters, + }; - const batchResult = await fetchSampleDocuments({ - esClient, - index: streamName, + const computedFeaturesPromise = identifyComputedFeatures({ + stream, + streamName: stream.name, start, end, - features: known.getDiscovered().filter(isFeatureWithFilter), - logger, - size: sampleSize, - entityFilteredRatio, - diverseRatio, - maxEntityFilters, - diverseOffset, + esClient, + featureClient, + logger: taskLogger, + featureTtlDays: tuningConfig.feature_ttl_days, + runId, }); - if (batchResult.nextOffset === diverseOffset) { - // Diverse sampling is expensive; once the pool is exhausted (offset did not advance), skip further diverse fetches. - diverseRatio = 0; - } - diverseOffset = batchResult.nextOffset; - - if (batchResult.documents.length === 0) { - logger.debug('Stopping: no documents available for sampling'); - break; - } - - const previousFeatures = known.getTopRanked(MAX_PREVIOUSLY_IDENTIFIED_FEATURES); - - logger.debug( - () => - `Iteration ${i + 1}/${maxIterations}: processing ${ - batchResult.documents.length - } documents, ${known.length} features known` - ); + let diverseOffset = 0; - const iterationStart = Date.now(); - - const identifyFeaturesArgs = { - streamName, - sampleDocuments: batchResult.documents, - excludedFeatures: excludedSummaries, - inferenceClient, - systemPrompt, - logger, - signal, - previouslyIdentifiedFeatures: previousFeatures.map((f) => ({ - id: f.id, - type: f.type, - subtype: f.subtype, - title: f.title, - description: f.description, - properties: f.properties, - })), - }; + for (let i = 0; i < maxIterations; i++) { + if (runContext.abortController.signal.aborted) { + taskLogger.debug('Feature identification aborted'); + throw new Error('Request was aborted'); + } - const emitFailedIteration = (sinceMs: number) => - onIterationComplete?.( - { - iteration: i + 1, - state: 'failure', - docsCount: batchResult.documents.length, - featuresNew: 0, - featuresUpdated: 0, - durationMs: Date.now() - sinceMs, - tokensUsed: EMPTY_TOKENS, - ignoredFeaturesCount: 0, - codeIgnoredCount: 0, - totalFilters: batchResult.totalFilters, - filtersCapped: batchResult.filtersCapped, - hasFilteredDocuments: batchResult.hasFilteredDocuments, - }, - { newFeatures: [], updatedFeatures: [] } + taskLogger.debug( + () => + `Iteration ${i + 1}/${maxIterations}: ` + + `${discoveredFeatures.length} features known, starting iteration` ); - let result: Awaited>; - try { - result = await identifyFeatures(identifyFeaturesArgs); - } catch (error) { - if (signal.aborted) { - throw error; + const result = await identifyInferredFeatures({ + esClient, + featureClient, + soClient, + inferenceClient: boundInferenceClient, + logger: taskLogger, + signal: runContext.abortController.signal, + streamName: stream.name, + streamType, + start, + end, + runId, + iteration: i + 1, + tuning, + diverseOffset, + trackFeaturesIdentified, + }); + + if (!result.hasDocuments) { + taskLogger.debug('Stopping: no documents available for sampling'); + break; } - const errorMsg = error instanceof Error ? error.message : String(error); - logger.warn(`Iteration ${i + 1} failed (${errorMsg}), continuing`); - failureCount++; - await emitFailedIteration(iterationStart); - continue; - } - successCount++; + if (result.nextDiverseOffset === diverseOffset) { + tuning = { ...tuning, diverse_ratio: 0 }; + } + diverseOffset = result.nextDiverseOffset; - const { features: rawFeatures, tokensUsed, ignoredFeatures } = result; + iterationResults.push(result.iterationResult); + discoveredFeatures = result.discoveredFeatures; + } - totalTokensUsed = sumTokens(totalTokensUsed, tokensUsed); + if (iterationResults.length > 0 && deriveSuccessCount(iterationResults) === 0) { + throw new Error(`All iterations failed for stream ${streamName}`); + } - const { newFeatures, updatedFeatures, codeIgnoredCount } = reconcileFeatures({ - rawFeatures, - known, - ignoredFeatures, - logger, - excludedFeatures, - featureTtlDays, + const reconciledComputedFeatures = await computedFeaturesPromise.catch((err) => { + taskLogger.warn(`Computed features generation failed: ${parseError(err).message}`); + return [] as Awaited>; }); + const allFeatures = [...discoveredFeatures, ...reconciledComputedFeatures]; - for (const feature of newFeatures) { - known.add(feature); - } - for (const feature of updatedFeatures) { - known.update(feature); - if (known.isStoredFeature(feature)) { - known.promoteFromStorage(feature.uuid); - } + await taskClient.complete( + _task, + taskParams, + { ...buildTaskResult(iterationResults, taskDurationMs()), features: allFeatures } + ); + } catch (error) { + if (isDefinitionNotFoundError(error)) { + taskLogger.debug( + () => + `Stream ${streamName} was deleted before features identification task started, skipping` + ); + return getDeleteTaskRunResult(); } - const iterationEntry: IterationTelemetry = { - iteration: i + 1, - state: 'success', - docsCount: batchResult.documents.length, - featuresNew: newFeatures.length, - featuresUpdated: updatedFeatures.length, - durationMs: Date.now() - iterationStart, - tokensUsed, - ignoredFeaturesCount: ignoredFeatures.length, - codeIgnoredCount, - totalFilters: batchResult.totalFilters, - filtersCapped: batchResult.filtersCapped, - hasFilteredDocuments: batchResult.hasFilteredDocuments, - }; + const errorMessage = await resolveErrorMessage(error, inferenceClient, connectorId, taskLogger); - await onIterationComplete?.(iterationEntry, { newFeatures, updatedFeatures }); + if (isCancellationError(errorMessage)) { + taskLogger.debug(() => `Task ${runContext.taskInstance.id} was canceled: ${errorMessage}`); + trackEmptyTelemetry('canceled'); + return getDeleteTaskRunResult(); + } + + taskLogger.error(`Task ${runContext.taskInstance.id} failed: ${errorMessage}`, { + error, + } as LogMeta); - logger.debug( - () => - `Iteration ${i + 1}: found ${rawFeatures.length} features ` + - `(${newFeatures.length} new, ${updatedFeatures.length} updated), ${known.length} total known, ` + - `tokens: prompt=${tokensUsed.prompt} completion=${tokensUsed.completion} cached=${ - tokensUsed.cached ?? 0 - }` + await taskClient.fail( + _task, + taskParams, + errorMessage, + { ...buildTaskResult(iterationResults, taskDurationMs()), features: [] } ); - } - if (failureCount > 0 && successCount === 0) { - throw new Error(`All iterations failed for stream ${streamName}`); - } + if (!hasTrackedIteration) { + trackEmptyTelemetry('failure'); + } - return { - features: known.getDiscovered(), - tokensUsed: totalTokensUsed, - }; + return getDeleteTaskRunResult(); + } } -export interface FeaturesIdentificationTaskParams { - start: number; - end: number; - streamName: string; - connectorId?: string; -} +async function resolveErrorMessage( + error: unknown, + inferenceClient: { getConnectorById: (id: string) => Promise }, + connectorId: string, + logger: Logger +): Promise { + if (!isInferenceProviderError(error)) { + return parseError(error).message; + } -export const FEATURES_IDENTIFICATION_TASK_TYPE = 'streams_features_identification'; + let connector; + try { + connector = await inferenceClient.getConnectorById(connectorId); + } catch (connectorErr) { + logger.warn( + `Failed to fetch connector ${connectorId} for error enrichment: ${ + connectorErr instanceof Error ? connectorErr.message : String(connectorErr) + }` + ); + } -export function getFeaturesIdentificationTaskId(streamName: string) { - return `${FEATURES_IDENTIFICATION_TASK_TYPE}_${streamName}`; + return connector ? formatInferenceProviderError(error, connector) : parseError(error).message; } export function createStreamsFeaturesIdentificationTask(taskContext: TaskContext) { return { [FEATURES_IDENTIFICATION_TASK_TYPE]: { - createTaskRunner: (runContext) => { - return { - run: cancellableTask( - async () => { - if (!runContext.fakeRequest) { - throw new Error('Request is required to run this task'); - } - const { fakeRequest } = runContext; - - const { - start, - end, - streamName, - connectorId: connectorIdOverride, - _task, - } = runContext.taskInstance.params as TaskParams; - - const runId = uuid(); - const trackEmptyTelemetry = (state: 'canceled' | 'failure') => { - taskContext.telemetry.trackFeaturesIdentified({ - run_id: runId, - iteration: 0, - stream_name: streamName, - stream_type: 'unknown', - state, - docs_count: 0, - features_new: 0, - features_updated: 0, - input_tokens_used: 0, - output_tokens_used: 0, - total_tokens_used: 0, - cached_tokens_used: 0, - duration_ms: 0, - total_filters: 0, - filters_capped: false, - has_filtered_documents: false, - excluded_features_count: 0, - llm_ignored_count: 0, - code_ignored_count: 0, - }); - }; - - const { - taskClient, - scopedClusterClient, - getFeatureClient, - streamsClient, - inferenceClient, - soClient, - tuningConfig, - } = await taskContext.getScopedClients({ - request: runContext.fakeRequest, - }); - - const featureClient = await getFeatureClient(); - - const taskLogger = taskContext.logger.get('features_identification', streamName); - const connectorId = - connectorIdOverride ?? - (await resolveConnectorForFeature({ - searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints, - featureId: STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID, - featureName: 'knowledge indicator extraction', - request: fakeRequest, - })); - taskLogger.debug(`Using connector ${connectorId} for knowledge indicator extraction`); - - let hasTrackedIteration = false; - const iterationResults: IterationResult[] = []; - try { - const [ - stream, - { hits: allExistingFeatures }, - { hits: excludedFeatures }, - { featurePromptOverride }, - ] = await Promise.all([ - streamsClient.getStream(streamName), - featureClient.getFeatures(streamName), - featureClient.getExcludedFeatures(streamName), - new PromptsConfigService({ - soClient, - logger: taskLogger, - }).getPrompt(), - ]); - - const streamType = getStreamTypeFromDefinition(stream); - const boundInferenceClient = inferenceClient.bindTo({ connectorId }); - const esClient = scopedClusterClient.asCurrentUser; - - const existingFeatures = allExistingFeatures.filter((f) => !isComputedFeature(f)); - - const [ - { features: inferredFeatures, tokensUsed: totalTokensUsed }, - computedFeatures, - ] = await Promise.all([ - identifyStreamFeatures({ - streamName: stream.name, - esClient, - start, - end, - existingFeatures, - excludedFeatures, - inferenceClient: boundInferenceClient, - logger: taskLogger, - signal: runContext.abortController.signal, - systemPrompt: featurePromptOverride, - maxIterations: tuningConfig.max_iterations, - maxExcludedFeaturesForPrompt: tuningConfig.max_excluded_features_in_prompt, - sampleSize: tuningConfig.sample_size, - entityFilteredRatio: tuningConfig.entity_filtered_ratio, - diverseRatio: tuningConfig.diverse_ratio, - maxEntityFilters: tuningConfig.max_entity_filters, - featureTtlDays: tuningConfig.feature_ttl_days, - onIterationComplete: async (it, changes) => { - const allChanged = [...changes.newFeatures, ...changes.updatedFeatures]; - if (allChanged.length > 0) { - await featureClient.bulk( - stream.name, - allChanged.map((feature) => ({ index: { feature } })) - ); - } - iterationResults.push({ - iteration: it.iteration, - durationMs: it.durationMs, - state: it.state, - tokensUsed: it.tokensUsed, - newFeatures: changes.newFeatures.map(toFeatureSummary), - updatedFeatures: changes.updatedFeatures.map(toFeatureSummary), - }); - taskContext.telemetry.trackFeaturesIdentified({ - run_id: runId, - iteration: it.iteration, - stream_name: streamName, - stream_type: streamType, - state: it.state, - docs_count: it.docsCount, - features_new: it.featuresNew, - features_updated: it.featuresUpdated, - input_tokens_used: it.tokensUsed.prompt, - output_tokens_used: it.tokensUsed.completion, - total_tokens_used: it.tokensUsed.total, - cached_tokens_used: it.tokensUsed.cached ?? 0, - duration_ms: it.durationMs, - excluded_features_count: excludedFeatures.length, - llm_ignored_count: it.ignoredFeaturesCount, - code_ignored_count: it.codeIgnoredCount, - total_filters: it.totalFilters, - filters_capped: it.filtersCapped, - has_filtered_documents: it.hasFilteredDocuments, - }); - hasTrackedIteration = true; - }, - }), - generateAllComputedFeatures({ - stream, - start, - end, - esClient, - logger: taskContext.logger.get('computed_features', streamName), - }), - ]); - - const durationMs = Date.now() - new Date(_task.created_at).getTime(); - - const reconciledComputedFeatures = reconcileComputedFeatures({ - computedFeatures, - streamName, - featureTtlDays: tuningConfig.feature_ttl_days, - }); - - if (reconciledComputedFeatures.length > 0) { - await featureClient.bulk( - stream.name, - reconciledComputedFeatures.map((feature) => ({ index: { feature } })) - ); - } - - const allFeatures = [...inferredFeatures, ...reconciledComputedFeatures]; - - await taskClient.complete( - _task, - { start, end, streamName, connectorId: connectorIdOverride }, - { - features: allFeatures, - durationMs, - iterations: iterationResults, - totalTokensUsed, - } - ); - } catch (error) { - const failDurationMs = Date.now() - new Date(_task.created_at).getTime(); - - if (isDefinitionNotFoundError(error)) { - taskLogger.debug( - () => - `Stream ${streamName} was deleted before features identification task started, skipping` - ); - return getDeleteTaskRunResult(); - } - - let connector; - try { - connector = await inferenceClient.getConnectorById(connectorId); - } catch (connectorErr) { - taskLogger.warn( - `Failed to fetch connector ${connectorId} for error enrichment: ${ - connectorErr instanceof Error ? connectorErr.message : String(connectorErr) - }` - ); - } - - const errorMessage = - isInferenceProviderError(error) && connector - ? formatInferenceProviderError(error, connector) - : parseError(error).message; - - if ( - errorMessage.includes('ERR_CANCELED') || - errorMessage.includes('Request was aborted') - ) { - taskLogger.debug( - () => `Task ${runContext.taskInstance.id} was canceled: ${errorMessage}` - ); - trackEmptyTelemetry('canceled'); - return getDeleteTaskRunResult(); - } - - taskLogger.error(`Task ${runContext.taskInstance.id} failed: ${errorMessage}`, { - error, - } as LogMeta); - - const partialTokensUsed = iterationResults.reduce( - (acc, iter) => sumTokens(acc, iter.tokensUsed), - { ...EMPTY_TOKENS } - ); - - await taskClient.fail( - _task, - { start, end, streamName, connectorId: connectorIdOverride }, - errorMessage, - { - features: [], - durationMs: failDurationMs, - iterations: iterationResults, - totalTokensUsed: partialTokensUsed, - } - ); - - if (!hasTrackedIteration) { - trackEmptyTelemetry('failure'); - } - - return getDeleteTaskRunResult(); - } - }, - runContext, - taskContext - ), - }; - }, + createTaskRunner: (runContext) => ({ + run: cancellableTask( + () => runFeaturesIdentification(taskContext, runContext), + runContext, + taskContext + ), + }), }, } satisfies TaskDefinitionRegistry; } - -function createFeatureMetadata(featureTtlDays: number) { - const now = Date.now(); - const featureTtlMs = featureTtlDays * 24 * 60 * 60 * 1000; - return { - status: 'active' as const, - last_seen: new Date(now).toISOString(), - expires_at: new Date(now + featureTtlMs).toISOString(), - }; -} - -/** Compares only domain fields (ignores uuid, status, timestamps) */ -const hasChanged = (updated: BaseFeature, current: Feature): boolean => - !isEqual(updated, toBaseFeature(current)); - -function reconcileFeatures({ - rawFeatures, - known, - ignoredFeatures, - excludedFeatures, - logger, - featureTtlDays, -}: { - rawFeatures: BaseFeature[]; - known: FeatureAccumulator; - ignoredFeatures: IgnoredFeature[]; - excludedFeatures: Feature[]; - logger: Logger; - featureTtlDays: number; -}): { newFeatures: Feature[]; updatedFeatures: Feature[]; codeIgnoredCount: number } { - const newFeatures: Feature[] = []; - const updatedFeatures: Feature[] = []; - const metadata = createFeatureMetadata(featureTtlDays); - - for (const ignored of ignoredFeatures) { - logger.debug( - () => - `LLM ignored feature "${ignored.feature_id}" (matched excluded "${ignored.excluded_feature_id}"): ${ignored.reason}` - ); - } - - // Server-side safety net: check against ALL excluded features (not just the subset sent to the LLM) - let codeIgnoredCount = 0; - const nonExcludedInferredFeatures = rawFeatures.filter((feature) => { - const matchingExcluded = excludedFeatures.find((excluded) => - isDuplicateFeature(feature, excluded) - ); - if (matchingExcluded) { - codeIgnoredCount++; - logger.debug( - () => - `Dropping inferred feature [${feature.id}] because it matches excluded feature [${matchingExcluded.id}]` - ); - return false; - } - return true; - }); - - for (const raw of nonExcludedInferredFeatures) { - const match = known.findDuplicate(raw); - - if (match) { - if (known.isStoredFeature(match)) { - // Stored-origin: always update to refresh last_seen / expires_at - updatedFeatures.push({ ...raw, ...metadata, uuid: match.uuid }); - } else { - // Intra-run: merge properties accumulated across iterations of this run - const merged = mergeFeature(match, raw); - if (hasChanged(merged, match)) { - updatedFeatures.push({ ...merged, ...metadata, uuid: match.uuid }); - } - } - } else { - newFeatures.push({ ...raw, ...metadata, uuid: uuid() }); - } - } - - return { newFeatures, updatedFeatures, codeIgnoredCount }; -} - -function reconcileComputedFeatures({ - computedFeatures, - streamName, - featureTtlDays, -}: { - computedFeatures: BaseFeature[]; - streamName: string; - featureTtlDays: number; -}): Feature[] { - const metadata = createFeatureMetadata(featureTtlDays); - return computedFeatures.map((feature) => ({ - ...feature, - ...metadata, - uuid: uuidv5(`${streamName}:${feature.id}`, uuidv5.DNS), - })); -} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/index.ts b/x-pack/platform/plugins/shared/streams/server/routes/index.ts index 50f2dec7995c5..8a012a1c30c10 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/index.ts @@ -28,6 +28,7 @@ import { attachmentRoutes } from './attachments/route'; import { internalAttachmentRoutes } from './internal/attachments/route'; import { internalDescriptionGenerationRoutes } from './internal/sig_events/description_generation/route'; import { featureRoutes as internalFeatureRoutes } from './internal/sig_events/features/route'; +import { identifyFeaturesRoutes as internalIdentifyFeaturesRoutes } from './internal/sig_events/features/identify_route'; import { internalInsightsRoutes } from './internal/sig_events/insights/route'; import { internalTasksRoutes } from './internal/streams/tasks/route'; import { internalOnboardingRoutes } from './internal/streams/onboarding/route'; @@ -53,6 +54,7 @@ export const streamsRouteRepository = { ...internalAttachmentRoutes, ...internalDescriptionGenerationRoutes, ...internalFeatureRoutes, + ...internalIdentifyFeaturesRoutes, ...internalInsightsRoutes, ...internalTasksRoutes, ...internalOnboardingRoutes, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/sig_events/features/identify_route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/sig_events/features/identify_route.ts new file mode 100644 index 0000000000000..31c5f985006a0 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/sig_events/features/identify_route.ts @@ -0,0 +1,267 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { v4 as uuidv4 } from 'uuid'; +import { z } from '@kbn/zod/v4'; +import { + getStreamTypeFromDefinition, + STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID, +} from '@kbn/streams-schema'; +import { isInferenceProviderError } from '@kbn/inference-common'; +import { createServerRoute } from '../../../create_server_route'; +import { assertSignificantEventsAccess } from '../../../utils/assert_significant_events_access'; +import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; +import { resolveConnectorForFeature } from '../../../utils/resolve_connector_for_feature'; +import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal'; +import { formatInferenceProviderError } from '../../../utils/create_connector_sse_error'; +import { + MS_PER_DAY, + buildTelemetry, + identifyInferredFeatures, + identifyComputedFeatures, +} from '../../../../lib/sig_events/features'; + +// --------------------------------------------------------------------------- +// Route 1: Identify inferred features (one iteration: sample + infer + reconcile) +// --------------------------------------------------------------------------- + +const identifyInferredFeaturesRoute = createServerRoute({ + endpoint: 'POST /internal/streams/{streamName}/features/_identify/inferred', + options: { + access: 'internal', + summary: 'Sample documents, run LLM inference, and reconcile KI features for one iteration', + timeout: { idleSocket: 300_000 }, + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.manage], + }, + }, + params: z.object({ + path: z.object({ streamName: z.string() }), + body: z + .object({ + connectorId: z.string().optional(), + start: z.number().optional(), + end: z.number().optional(), + runId: z.string().optional(), + iteration: z.number().optional(), + featureTtlDays: z.number().optional(), + sampleSize: z.number().optional(), + entityFilteredRatio: z.number().min(0).max(1).optional(), + diverseRatio: z.number().min(0).max(1).optional(), + maxEntityFilters: z.number().optional(), + maxExcludedFeaturesInPrompt: z.number().optional(), + maxPreviouslyIdentifiedFeatures: z.number().optional(), + diverseOffset: z.number().min(0).optional(), + }) + .nullable() + .optional(), + }), + handler: async ({ params, request, getScopedClients, server, logger, telemetry }) => { + const { + scopedClusterClient, + getFeatureClient, + streamsClient, + inferenceClient, + soClient, + tuningConfig, + licensing, + uiSettingsClient, + } = await getScopedClients({ request }); + + await assertSignificantEventsAccess({ server, licensing, uiSettingsClient }); + + const { streamName } = params.path; + const routeLogger = logger.get('features_identification', 'inferred', streamName); + const now = Date.now(); + const { + start = now - MS_PER_DAY, + end = now, + connectorId: connectorIdOverride, + runId = uuidv4(), + iteration, + featureTtlDays = tuningConfig.feature_ttl_days, + sampleSize = tuningConfig.sample_size, + entityFilteredRatio = tuningConfig.entity_filtered_ratio, + diverseRatio = tuningConfig.diverse_ratio, + maxEntityFilters = tuningConfig.max_entity_filters, + maxExcludedFeaturesInPrompt = tuningConfig.max_excluded_features_in_prompt, + maxPreviouslyIdentifiedFeatures, + diverseOffset, + } = params.body ?? {}; + + const [connectorId, stream, featureClient] = await Promise.all([ + connectorIdOverride + ? Promise.resolve(connectorIdOverride) + : resolveConnectorForFeature({ + searchInferenceEndpoints: server.searchInferenceEndpoints, + featureId: STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID, + featureName: 'knowledge indicator extraction', + request, + }), + streamsClient.getStream(streamName), + getFeatureClient(), + ]); + + const streamType = getStreamTypeFromDefinition(stream); + + try { + return await identifyInferredFeatures({ + esClient: scopedClusterClient.asCurrentUser, + featureClient, + soClient, + inferenceClient: inferenceClient.bindTo({ connectorId }), + logger: routeLogger, + signal: getRequestAbortSignal(request), + streamName, + streamType, + start, + end, + runId, + iteration, + tuning: { + feature_ttl_days: featureTtlDays, + sample_size: sampleSize, + entity_filtered_ratio: entityFilteredRatio, + diverse_ratio: diverseRatio, + max_entity_filters: maxEntityFilters, + max_excluded_features_in_prompt: maxExcludedFeaturesInPrompt, + maxPreviouslyIdentifiedFeatures, + }, + diverseOffset, + trackFeaturesIdentified: (data) => telemetry.trackFeaturesIdentified(data), + }); + } catch (error) { + routeLogger.error( + `Inferred feature identification failed for stream [${streamName}]: ${ + error instanceof Error ? error.message : String(error) + }` + ); + + telemetry.trackFeaturesIdentified( + buildTelemetry( + { + run_id: runId, + iteration: iteration ?? 1, + stream_name: streamName, + stream_type: streamType, + docs_count: 0, + excluded_features_count: 0, + total_filters: 0, + filters_capped: false, + has_filtered_documents: false, + }, + Date.now() - now, + { state: 'failure' } + ) + ); + + if (isInferenceProviderError(error)) { + const connector = await inferenceClient + .getConnectorById(connectorId) + .catch(() => undefined); + if (connector) { + throw new Error(formatInferenceProviderError(error, connector)); + } + } + + throw error; + } + }, +}); + +// --------------------------------------------------------------------------- +// Route 2: Identify computed features (generate and persist computed KI features) +// --------------------------------------------------------------------------- + +const identifyComputedFeaturesRoute = createServerRoute({ + endpoint: 'POST /internal/streams/{streamName}/features/_identify/computed', + options: { + access: 'internal', + summary: 'Generate and persist computed KI features for a stream', + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.manage], + }, + }, + params: z.object({ + path: z.object({ streamName: z.string() }), + body: z + .object({ + start: z.number().optional(), + end: z.number().optional(), + runId: z.string().optional(), + featureTtlDays: z.number().optional(), + }) + .nullable() + .optional(), + }), + handler: async ({ params, request, getScopedClients, server, logger }) => { + const { + scopedClusterClient, + getFeatureClient, + streamsClient, + tuningConfig, + licensing, + uiSettingsClient, + } = await getScopedClients({ request }); + + await assertSignificantEventsAccess({ server, licensing, uiSettingsClient }); + + const { streamName } = params.path; + const routeLogger = logger.get('features_identification', 'computed', streamName); + const now = Date.now(); + const { + start = now - MS_PER_DAY, + end = now, + runId = uuidv4(), + featureTtlDays = tuningConfig.feature_ttl_days, + } = params.body ?? {}; + + const [featureClient, stream] = await Promise.all([ + getFeatureClient(), + streamsClient.getStream(streamName), + ]); + + try { + const computedFeatures = await identifyComputedFeatures({ + stream, + streamName, + start, + end, + esClient: scopedClusterClient.asCurrentUser, + featureClient, + logger: routeLogger, + featureTtlDays, + runId, + }); + + return { + computedFeatures, + computedFeaturesCount: computedFeatures.length, + }; + } catch (error) { + routeLogger.error( + `Computed feature identification failed for stream [${streamName}]: ${ + error instanceof Error ? error.message : String(error) + }` + ); + throw error; + } + }, +}); + +// --------------------------------------------------------------------------- +// Exports +// --------------------------------------------------------------------------- + +export const identifyFeaturesRoutes = { + ...identifyInferredFeaturesRoute, + ...identifyComputedFeaturesRoute, +};