diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts index 0a95c837ad3b3..e35bb3a773151 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts @@ -20,6 +20,8 @@ export { routingDefinitionListSchema, } from './src/models/ingest/routing'; +export { getStreamTypeFromDefinition } from './src/helpers/get_stream_type_from_definition'; +export type { StreamType } from './src/helpers/get_stream_type_from_definition'; export { isRootStreamDefinition } from './src/helpers/is_root'; export { isOtelStream } from './src/helpers/is_otel_stream'; export { getIndexPatternsForStream } from './src/helpers/hierarchy_helpers'; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/get_stream_type_from_definition.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/get_stream_type_from_definition.ts new file mode 100644 index 0000000000000..5d69398312cc7 --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/get_stream_type_from_definition.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Streams } from '../models/streams'; + +export type StreamType = 'wired' | 'classic' | 'unknown'; + +export function getStreamTypeFromDefinition(definition: Streams.all.Definition): StreamType { + if (Streams.WiredStream.Definition.is(definition)) { + return 'wired'; + } + + if (Streams.ClassicStream.Definition.is(definition)) { + return 'classic'; + } + + return 'unknown'; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_type_registry.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_type_registry.ts index 3de508c3a1b45..5b97f1e8f51dc 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_type_registry.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/feature/feature_type_registry.ts @@ -15,6 +15,11 @@ import type { StoredFeature } from './stored_feature'; import { SystemFeatureHandler } from './handlers/system'; import { FEATURE_TYPE } from './fields'; +export interface IdentifyFeaturesResult { + features: Feature[]; + tokensUsed: ChatCompletionTokenCount; +} + export class FeatureTypeRegistry { private handlers = new Map(); @@ -58,7 +63,7 @@ export class FeatureTypeRegistry { async identifyFeatures( options: Omit - ): Promise<{ features: Feature[]; tokensUsed: ChatCompletionTokenCount }> { + ): Promise { options.logger.debug(`Identifying features for stream ${options.stream.name}`); options.logger.trace('Describing dataset for feature identification'); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/acknowledging_incomplete_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/acknowledging_incomplete_error.ts new file mode 100644 index 0000000000000..80a795ecebd4a --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/acknowledging_incomplete_error.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class AcknowledgingIncompleteError extends Error { + constructor(message: string) { + super(message); + this.name = 'AcknowledgingIncompleteError'; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts new file mode 100644 index 0000000000000..8b693e8a85715 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts @@ -0,0 +1,85 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { RunContext } from '@kbn/task-manager-plugin/server'; +import type { RunFunction } from '@kbn/task-manager-plugin/server/task'; +import type { TaskContext } from './task_definitions'; + +export function cancellableTask( + run: RunFunction, + runContext: RunContext, + taskContext: TaskContext +) { + return async () => { + if (!runContext.fakeRequest) { + throw new Error('Request is required to run this task'); + } + + const { taskClient } = await taskContext.getScopedClients({ + request: runContext.fakeRequest, + }); + + try { + let intervalId: NodeJS.Timeout; + const cancellationPromise = new Promise<'canceled'>((resolve) => { + taskContext.logger.debug('Starting cancellable task check loop'); + intervalId = setInterval(async () => { + const task = await taskClient.get(runContext.taskInstance.id); + taskContext.logger.trace( + `Cancellable task check loop for task ${runContext.taskInstance.id}: status is ${task.status}` + ); + if (task.status === 'being_canceled') { + runContext.abortController.abort(); + await taskClient.update({ + ...task, + status: 'canceled', + }); + resolve('canceled' as const); + } + }, 5000); + }); + + taskContext.logger.debug( + `Running task ${runContext.taskInstance.id} with cancellation support (race)` + ); + const result = await Promise.race([run(), cancellationPromise]).finally(() => { + clearInterval(intervalId); + }); + + if (result === 'canceled') { + taskContext.logger.debug(`Task ${runContext.taskInstance.id} canceled`); + return undefined; + } + + taskContext.logger.debug(`Task ${runContext.taskInstance.id} completed`); + return result; + } catch (error) { + taskContext.logger.error(`Task ${runContext.taskInstance.id} failed unexpectedly`, { error }); + + try { + await taskClient.update({ + id: runContext.taskInstance.id, + status: 'failed', + task: { + params: {}, + error: error.message, + }, + created_at: new Date().toISOString(), + space: '', + type: '', + stream: '', + }); + } catch (updateError) { + taskContext.logger.error('Failed to update task status after error', { + error: updateError, + }); + } + + throw error; + } + }; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellation_in_progress_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellation_in_progress_error.ts new file mode 100644 index 0000000000000..9ff5da034bde7 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellation_in_progress_error.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class CancellationInProgressError extends Error { + constructor(message: string) { + super(message); + this.name = 'CancellationInProgressError'; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/is_stale.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/is_stale.ts new file mode 100644 index 0000000000000..7edfe6c785722 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/is_stale.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +const fiveMinutesInMs = 5 * 60 * 1000; + +export function isStale(taskCreatedAt: string) { + const createdAt = new Date(taskCreatedAt).getTime(); + const now = Date.now(); + return now - createdAt > fiveMinutesInMs; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts index 1eeb97ad37d1d..9f8335d4c40b0 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_client.ts @@ -10,6 +10,8 @@ import { TaskPriority, type TaskManagerStartContract } from '@kbn/task-manager-p import { isNotFoundError, isResponseError } from '@kbn/es-errors'; import type { TaskStorageClient } from './storage'; import type { PersistedTask, TaskParams } from './types'; +import { CancellationInProgressError } from './cancellation_in_progress_error'; +import { AcknowledgingIncompleteError } from './acknowledging_incomplete_error'; interface TaskRequest { task: Omit; @@ -24,7 +26,9 @@ export class TaskClient { private readonly logger: Logger ) {} - public async get(id: string): Promise> { + public async get( + id: string + ): Promise> { try { this.logger.debug(`Getting task ${id}`); @@ -37,7 +41,7 @@ export class TaskClient { throw new Error(`Task ${id} has no source`); } - return response._source as PersistedTask; + return response._source as PersistedTask; } catch (error) { if (isNotFoundError(error)) { return { @@ -47,6 +51,9 @@ export class TaskClient { space: '', stream: '', type: '', + task: { + params: {} as TParams, + }, }; } @@ -58,9 +65,17 @@ export class TaskClient { task, params, request, - }: TaskRequest): Promise { - const taskDoc: PersistedTask = { + }: TaskRequest) { + const storedTask = await this.get(task.id); + if (storedTask.status === 'being_canceled') { + throw new CancellationInProgressError('Previous task run is still being canceled'); + } + + const taskDoc: PersistedTask = { ...task, + task: { + params, + }, status: 'in_progress', created_at: new Date().toISOString(), }; @@ -94,13 +109,44 @@ export class TaskClient { throw error; } } + } + + public async cancel(id: string) { + this.logger.debug(`Canceling task ${id}`); + + const task = await this.get(id); + if (task.status !== 'in_progress') { + return; + } + + await this.update({ + ...task, + status: 'being_canceled', + }); + } + + public async acknowledge(id: string) { + const task = await this.get(id); + + if (task.status !== 'completed') { + throw new AcknowledgingIncompleteError('Only completed tasks can be acknowledged'); + } + + this.logger.debug(`Acknowledging task ${id}`); + + const taskDoc = { + ...task, + status: 'acknowledged' as const, + }; + + await this.update(taskDoc); return taskDoc; } - public async update( - task: PersistedTask - ): Promise> { + public async update( + task: PersistedTask + ) { this.logger.debug(`Updating task ${task.id}`); await this.storageClient.index({ @@ -109,7 +155,5 @@ export class TaskClient { // This might cause issues if there are many updates in a short time from multiple tasks running concurrently refresh: true, }); - - return task; } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/feature_identification.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/feature_identification.ts new file mode 100644 index 0000000000000..375997ce8b79d --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/feature_identification.ts @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; +import { isInferenceProviderError } from '@kbn/inference-common'; +import { getStreamTypeFromDefinition, type FeatureType } from '@kbn/streams-schema'; +import { formatInferenceProviderError } from '../../../routes/utils/create_connector_sse_error'; +import type { TaskContext } from '.'; +import type { TaskParams } from '../types'; +import type { IdentifyFeaturesResult } from '../../streams/feature/feature_type_registry'; +import { getDefaultFeatureRegistry } from '../../streams/feature/feature_type_registry'; +import { PromptsConfigService } from '../../saved_objects/significant_events/prompts_config_service'; +import { cancellableTask } from '../cancellable_task'; + +export interface FeatureIdentificationTaskParams { + connectorId: string; + start: number; + end: number; +} + +export function createStreamsFeatureIdentificationTask(taskContext: TaskContext) { + return { + streams_feature_identification: { + createTaskRunner: (runContext) => { + return { + run: cancellableTask( + async () => { + if (!runContext.fakeRequest) { + throw new Error('Request is required to run this task'); + } + + const { connectorId, start, end, _task } = runContext.taskInstance + .params as TaskParams; + const { stream: name } = _task; + + const { + taskClient, + scopedClusterClient, + featureClient, + streamsClient, + inferenceClient, + soClient, + } = await taskContext.getScopedClients({ + request: runContext.fakeRequest, + }); + + try { + const [{ hits }, stream] = await Promise.all([ + featureClient.getFeatures(name), + streamsClient.getStream(name), + ]); + + const boundInferenceClient = inferenceClient.bindTo({ connectorId }); + const esClient = scopedClusterClient.asCurrentUser; + const featureRegistry = getDefaultFeatureRegistry(); + + const promptsConfigService = new PromptsConfigService({ + soClient, + logger: taskContext.logger, + }); + + const { featurePromptOverride, descriptionPromptOverride } = + await promptsConfigService.getPrompt(); + + const results = await featureRegistry.identifyFeatures({ + start, + end, + esClient, + inferenceClient: boundInferenceClient, + logger: taskContext.logger.get('feature_identification'), + stream, + features: hits, + signal: runContext.abortController.signal, + featurePromptOverride, + descriptionPromptOverride, + }); + + taskContext.telemetry.trackFeaturesIdentified({ + count: results.features.length, + count_by_type: results.features.reduce>( + (acc, feature) => { + acc[feature.type] = (acc[feature.type] || 0) + 1; + return acc; + }, + { + system: 0, + } + ), + stream_name: stream.name, + stream_type: getStreamTypeFromDefinition(stream), + input_tokens_used: results.tokensUsed.prompt, + output_tokens_used: results.tokensUsed.completion, + }); + + await taskClient.update({ + ..._task, + status: 'completed', + task: { + params: { + connectorId, + start, + end, + }, + payload: results, + }, + }); + } catch (error) { + // Get connector info for error enrichment + const connector = await inferenceClient.getConnectorById(connectorId); + + const errorMessage = isInferenceProviderError(error) + ? formatInferenceProviderError(error, connector) + : error.message; + + if ( + errorMessage.includes('ERR_CANCELED') || + errorMessage.includes('Request was aborted') + ) { + return; + } + + taskContext.logger.error( + `Task ${runContext.taskInstance.id} failed: ${errorMessage}` + ); + + await taskClient.update({ + ..._task, + status: 'failed', + task: { + params: { + connectorId, + start, + end, + }, + error: errorMessage, + }, + }); + } + }, + runContext, + taskContext + ), + }; + }, + }, + } satisfies TaskDefinitionRegistry; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts index 4388c0cb0a4d4..7dfb546f194e8 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts @@ -5,15 +5,22 @@ * 2.0. */ +import type { Logger } from '@kbn/core/server'; import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; import type { GetScopedClients } from '../../../routes/types'; +import { createStreamsFeatureIdentificationTask } from './feature_identification'; +import type { EbtTelemetryClient } from '../../telemetry'; export interface TaskContext { + logger: Logger; getScopedClients: GetScopedClients; + telemetry: EbtTelemetryClient; } export function createTaskDefinitions(taskContext: TaskContext) { - return {} satisfies TaskDefinitionRegistry; + return { + ...createStreamsFeatureIdentificationTask(taskContext), + } satisfies TaskDefinitionRegistry; } export type StreamsTaskType = keyof ReturnType; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts index dd48d53d0abff..2cdb3b946b8cd 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/types.ts @@ -5,39 +5,68 @@ * 2.0. */ -interface PersistedTaskBase { +export type TaskStatus = + | 'not_started' + | 'in_progress' + | 'completed' + | 'acknowledged' + | 'failed' + | 'being_canceled' + | 'canceled'; + +interface PersistedTaskBase { id: string; type: string; - status: 'not_started' | 'in_progress' | 'completed' | 'failed'; + status: TaskStatus; stream: string; space: string; created_at: string; + task: { + params: TParams; + }; } -interface NotStartedTask extends PersistedTaskBase { +interface NotStartedTask extends PersistedTaskBase { status: 'not_started'; } -interface InProgressTask extends PersistedTaskBase { +interface InProgressTask extends PersistedTaskBase { status: 'in_progress'; } -interface CompletedTask extends PersistedTaskBase { +interface BeingCanceledTask extends PersistedTaskBase { + status: 'being_canceled'; +} +interface CanceledTask extends PersistedTaskBase { + status: 'canceled'; +} +interface CompletedTask + extends PersistedTaskBase { status: 'completed'; - task: { + task: PersistedTaskBase['task'] & { payload: TPayload; }; } -interface FailedTask extends PersistedTaskBase { +interface AcknowledgedTask + extends PersistedTaskBase { + status: 'acknowledged'; + task: PersistedTaskBase['task'] & { + payload: TPayload; + }; +} +interface FailedTask extends PersistedTaskBase { status: 'failed'; - task: { + task: PersistedTaskBase['task'] & { error: string; }; } -export type PersistedTask = - | NotStartedTask - | InProgressTask - | CompletedTask - | FailedTask; +export type PersistedTask = + | NotStartedTask + | InProgressTask + | CompletedTask + | AcknowledgedTask + | FailedTask + | BeingCanceledTask + | CanceledTask; export type TaskParams = TParams & { _task: PersistedTask; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts index 821e092d2265e..f03dbc6215a47 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts @@ -6,8 +6,16 @@ */ import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server'; -import type { StreamEndpointLatencyProps, StreamsStateErrorProps } from './types'; -import { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT } from './constants'; +import type { + StreamEndpointLatencyProps, + StreamsFeatureIdentificationIdentifiedProps, + StreamsStateErrorProps, +} from './types'; +import { + STREAMS_ENDPOINT_LATENCY_EVENT, + STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, + STREAMS_STATE_ERROR_EVENT, +} from './constants'; const LATENCY_TRACKING_ENDPOINT_ALLOW_LIST = [ 'POST /api/streams/{name}/processing/_simulate 2023-10-31', @@ -53,4 +61,8 @@ export class EbtTelemetryClient { this.analytics.reportEvent(STREAMS_STATE_ERROR_EVENT, errorData); } + + public trackFeaturesIdentified(params: StreamsFeatureIdentificationIdentifiedProps) { + this.analytics.reportEvent(STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, params); + } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts index 6239bba62ab12..91934eb5902c9 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts @@ -7,5 +7,11 @@ const STREAMS_ENDPOINT_LATENCY_EVENT = 'streams-endpoint-latency'; const STREAMS_STATE_ERROR_EVENT = 'streams-state-error'; +const STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE = + 'streams-feature-identification-identified'; -export { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT }; +export { + STREAMS_ENDPOINT_LATENCY_EVENT, + STREAMS_STATE_ERROR_EVENT, + STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts index fd037b0e2f65d..4ff6be60b4f5f 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts @@ -5,8 +5,16 @@ * 2.0. */ -import { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT } from './constants'; -import { streamsEndpointLatencySchema, streamsStateErrorSchema } from './schemas'; +import { + STREAMS_ENDPOINT_LATENCY_EVENT, + STREAMS_STATE_ERROR_EVENT, + STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, +} from './constants'; +import { + streamsEndpointLatencySchema, + streamsStateErrorSchema, + streamsFeatureIdentificationIdentifiedSchema, +} from './schemas'; const streamsEndpointLatencyEventType = { eventType: STREAMS_ENDPOINT_LATENCY_EVENT, @@ -18,4 +26,13 @@ const streamsStateErrorEventType = { schema: streamsStateErrorSchema, }; -export { streamsEndpointLatencyEventType, streamsStateErrorEventType }; +const streamsFeatureIdentificationIdentifiedEventType = { + eventType: STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, + schema: streamsFeatureIdentificationIdentifiedSchema, +}; + +export { + streamsEndpointLatencyEventType, + streamsStateErrorEventType, + streamsFeatureIdentificationIdentifiedEventType, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts index aed37acf3e53a..067248c475434 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts @@ -5,8 +5,13 @@ * 2.0. */ -import type { RootSchema } from '@elastic/ebt/client'; -import type { StreamEndpointLatencyProps, StreamsStateErrorProps } from './types'; +import type { RootSchema, SchemaObject } from '@elastic/ebt/client'; +import type { FeatureType } from '@kbn/streams-schema'; +import type { + StreamEndpointLatencyProps, + StreamsFeatureIdentificationIdentifiedProps, + StreamsStateErrorProps, +} from './types'; const streamsEndpointLatencySchema: RootSchema = { name: { @@ -61,4 +66,58 @@ const streamsStateErrorSchema: RootSchema = { }, }; -export { streamsEndpointLatencySchema, streamsStateErrorSchema }; +const countByTypes: SchemaObject<{ [key in FeatureType]: number }> = { + _meta: { + description: 'The count of identified features or significant events by type', + }, + properties: { + system: { + type: 'long', + _meta: { + description: + 'The count of system features or significant events generated by system features', + }, + }, + }, +}; + +const streamsFeatureIdentificationIdentifiedSchema: RootSchema = + { + count: { + type: 'long', + _meta: { + description: 'The number of features identified', + }, + }, + count_by_type: countByTypes, + input_tokens_used: { + type: 'long', + _meta: { + description: 'The number of input tokens used for the generation request', + }, + }, + output_tokens_used: { + type: 'long', + _meta: { + description: 'The number of output tokens used for the generation request', + }, + }, + stream_type: { + type: 'keyword', + _meta: { + description: 'The type of the stream: wired or classic', + }, + }, + stream_name: { + type: 'keyword', + _meta: { + description: 'The name of the Stream', + }, + }, + }; + +export { + streamsEndpointLatencySchema, + streamsStateErrorSchema, + streamsFeatureIdentificationIdentifiedSchema, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts index af520410ef2f3..f4eca902e1d9d 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts @@ -6,7 +6,11 @@ */ import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server'; -import { streamsEndpointLatencyEventType, streamsStateErrorEventType } from './events'; +import { + streamsEndpointLatencyEventType, + streamsStateErrorEventType, + streamsFeatureIdentificationIdentifiedEventType, +} from './events'; import { EbtTelemetryClient } from './client'; export class EbtTelemetryService { @@ -18,6 +22,7 @@ export class EbtTelemetryService { this.analytics = analytics; this.analytics.registerEventType(streamsEndpointLatencyEventType); this.analytics.registerEventType(streamsStateErrorEventType); + this.analytics.registerEventType(streamsFeatureIdentificationIdentifiedEventType); } public getClient() { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts index 7dd157315ce29..a1917a45345d8 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts @@ -5,6 +5,8 @@ * 2.0. */ +import type { FeatureType, StreamType } from '@kbn/streams-schema'; + interface StreamEndpointLatencyProps { name: string; endpoint: string; @@ -20,4 +22,19 @@ interface StreamsStateErrorProps { status_code: number; } -export { type StreamEndpointLatencyProps, type StreamsStateErrorProps }; +type CountByFeatureType = Record; + +interface StreamsFeatureIdentificationIdentifiedProps { + count: number; + count_by_type: CountByFeatureType; + input_tokens_used: number; + output_tokens_used: number; + stream_name: string; + stream_type: StreamType; +} + +export { + type StreamEndpointLatencyProps, + type StreamsStateErrorProps, + type StreamsFeatureIdentificationIdentifiedProps, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/plugin.ts b/x-pack/platform/plugins/shared/streams/server/plugin.ts index cac2a6e3bcd24..f294d04e1d4eb 100644 --- a/x-pack/platform/plugins/shared/streams/server/plugin.ts +++ b/x-pack/platform/plugins/shared/streams/server/plugin.ts @@ -170,7 +170,13 @@ export class StreamsPlugin }; }; - taskService.registerTasks({ getScopedClients }); + const telemetryClient = this.ebtTelemetryService.getClient(); + + taskService.registerTasks({ + getScopedClients, + logger: this.logger, + telemetry: telemetryClient, + }); plugins.features.registerKibanaFeature({ id: STREAMS_FEATURE_ID, @@ -226,7 +232,7 @@ export class StreamsPlugin dependencies: { features: featureService, server: this.server, - telemetry: this.ebtTelemetryService.getClient(), + telemetry: telemetryClient, processorSuggestions: this.processorSuggestionsService, getScopedClients, }, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/route.ts index 52661bce64514..92fb96426e5b8 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/route.ts @@ -19,19 +19,25 @@ import type { } from '@kbn/storage-adapter'; import { generateStreamDescription, sumTokens } from '@kbn/streams-ai'; import type { Observable } from 'rxjs'; -import { from, map, catchError } from 'rxjs'; +import { catchError, from, map } from 'rxjs'; +import { BooleanFromString } from '@kbn/zod-helpers'; +import { conflict } from '@hapi/boom'; +import type { IdentifyFeaturesResult } from '../../../../lib/streams/feature/feature_type_registry'; +import { AcknowledgingIncompleteError } from '../../../../lib/tasks/acknowledging_incomplete_error'; +import { CancellationInProgressError } from '../../../../lib/tasks/cancellation_in_progress_error'; +import { isStale } from '../../../../lib/tasks/is_stale'; import { PromptsConfigService } from '../../../../lib/saved_objects/significant_events/prompts_config_service'; -import { createConnectorSSEError } from '../../../utils/create_connector_sse_error'; +import type { FeatureIdentificationTaskParams } from '../../../../lib/tasks/task_definitions/feature_identification'; import { resolveConnectorId } from '../../../utils/resolve_connector_id'; import { StatusError } from '../../../../lib/streams/errors/status_error'; -import { getDefaultFeatureRegistry } from '../../../../lib/streams/feature/feature_type_registry'; import { createServerRoute } from '../../../create_server_route'; import { checkAccess } from '../../../../lib/streams/stream_crud'; import { SecurityError } from '../../../../lib/streams/errors/security_error'; import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; import { assertSignificantEventsAccess } from '../../../utils/assert_significant_events_access'; -import type { IdentifiedFeaturesEvent, StreamDescriptionEvent } from './types'; +import type { StreamDescriptionEvent } from './types'; import { getRequestAbortSignal } from '../../../utils/get_request_abort_signal'; +import { createConnectorSSEError } from '../../../utils/create_connector_sse_error'; const dateFromString = z.string().transform((input) => new Date(input)); @@ -284,6 +290,21 @@ export const bulkFeaturesRoute = createServerRoute({ }, }); +export type FeatureIdentificationTaskResult = + | { + status: 'not_started' | 'in_progress' | 'stale' | 'being_canceled' | 'canceled'; + } + | { + status: 'failed'; + error: string; + } + | ({ + status: 'completed'; + } & IdentifyFeaturesResult) + | ({ + status: 'acknowledged'; + } & IdentifyFeaturesResult); + export const identifyFeaturesRoute = createServerRoute({ endpoint: 'POST /internal/streams/{name}/features/_identify', options: { @@ -293,7 +314,7 @@ export const identifyFeaturesRoute = createServerRoute({ }, security: { authz: { - requiredPrivileges: [STREAMS_API_PRIVILEGES.read], + requiredPrivileges: [STREAMS_API_PRIVILEGES.manage], }, }, params: z.object({ @@ -307,6 +328,9 @@ export const identifyFeaturesRoute = createServerRoute({ ), from: dateFromString, to: dateFromString, + schedule: BooleanFromString.optional(), + cancel: BooleanFromString.optional(), + acknowledge: BooleanFromString.optional(), }), }), handler: async ({ @@ -315,18 +339,12 @@ export const identifyFeaturesRoute = createServerRoute({ getScopedClients, server, logger, - }): Promise> => { - const { - featureClient, - scopedClusterClient, - licensing, - uiSettingsClient, - streamsClient, - inferenceClient, - soClient, - } = await getScopedClients({ - request, - }); + }): Promise => { + const { scopedClusterClient, licensing, uiSettingsClient, taskClient } = await getScopedClients( + { + request, + } + ); await assertSignificantEventsAccess({ server, licensing, uiSettingsClient }); @@ -347,52 +365,88 @@ export const identifyFeaturesRoute = createServerRoute({ logger, }); - // Get connector info for error enrichment - const connector = await inferenceClient.getConnectorById(connectorId); + if (params.query.schedule) { + try { + await taskClient.schedule({ + task: { + type: 'streams_feature_identification', + id: `streams_feature_identification_${name}`, + space: '*', + stream: name, + }, + params: { + connectorId, + start: start.getTime(), + end: end.getTime(), + }, + request, + }); - const [{ hits }, stream] = await Promise.all([ - featureClient.getFeatures(name), - streamsClient.getStream(name), - ]); + return { + status: 'in_progress', + }; + } catch (error) { + if (error instanceof CancellationInProgressError) { + throw conflict(error.message); + } + + throw error; + } + } else if (params.query.cancel) { + await taskClient.cancel(`streams_feature_identification_${name}`); + + return { + status: 'being_canceled', + }; + } else if (params.query.acknowledge) { + try { + const task = await taskClient.acknowledge< + FeatureIdentificationTaskParams, + IdentifyFeaturesResult + >(`streams_feature_identification_${name}`); - const esClient = scopedClusterClient.asCurrentUser; + return { + status: 'acknowledged', + ...task.task.payload, + }; + } catch (error) { + if (error instanceof AcknowledgingIncompleteError) { + throw conflict(error.message); + } - const boundInferenceClient = inferenceClient.bindTo({ connectorId }); - const signal = getRequestAbortSignal(request); - const featureRegistry = getDefaultFeatureRegistry(); - const promptsConfigService = new PromptsConfigService({ - soClient, - logger, - }); + throw error; + } + } - const { featurePromptOverride, descriptionPromptOverride } = - await promptsConfigService.getPrompt(); + const task = await taskClient.get( + `streams_feature_identification_${name}` + ); - return from( - featureRegistry.identifyFeatures({ - start: start.getTime(), - end: end.getTime(), - esClient, - inferenceClient: boundInferenceClient, - logger: logger.get('feature_identification'), - stream, - features: hits, - signal, - featurePromptOverride, - descriptionPromptOverride, - }) - ).pipe( - map(({ features, tokensUsed }) => { + if (task.status === 'in_progress') { + if (isStale(task.created_at)) { return { - type: 'identified_features' as const, - features, - tokensUsed, + status: 'stale', }; - }), - catchError((error: Error) => { - throw createConnectorSSEError(error, connector); - }) - ); + } + + return { + status: 'in_progress', + }; + } else if (task.status === 'failed') { + return { + status: 'failed', + error: task.task.error, + }; + } else if (task.status === 'completed' || task.status === 'acknowledged') { + return { + status: task.status, + ...task.task.payload, + }; + } + + return { + status: task.status, + }; }, }); diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/types.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/types.ts index c7b7f96eddf9a..c05c8795f9e95 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/features/types.ts @@ -7,12 +7,6 @@ import type { ChatCompletionTokenCount } from '@kbn/inference-common'; import type { ServerSentEventBase } from '@kbn/sse-utils'; -import type { Feature } from '@kbn/streams-schema'; - -export type IdentifiedFeaturesEvent = ServerSentEventBase< - 'identified_features', - { features: Feature[]; tokensUsed: ChatCompletionTokenCount } ->; export type StreamDescriptionEvent = ServerSentEventBase< 'stream_description', diff --git a/x-pack/platform/plugins/shared/streams/server/routes/utils/create_connector_sse_error.ts b/x-pack/platform/plugins/shared/streams/server/routes/utils/create_connector_sse_error.ts index a860f8f604280..e2c6b57cdbb81 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/utils/create_connector_sse_error.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/utils/create_connector_sse_error.ts @@ -5,6 +5,7 @@ * 2.0. */ +import type { InferenceTaskProviderError } from '@kbn/inference-common'; import { getConnectorModel, isInferenceProviderError, @@ -61,22 +62,29 @@ function getErrorMessage(message: string, status?: number): string { return message; } +export function formatInferenceProviderError( + error: InferenceTaskProviderError, + connector: InferenceConnector +): string { + const model = getConnectorModel(connector); + const cause = getErrorMessage(error.message, error.meta?.status); + + const lines = [ + `Connector: ${connector.name}`, + model ? `Model: ${model}` : null, + `Cause: ${cause}`, + ].filter(Boolean); + + return lines.join('\n'); +} + /** * Creates a user-friendly SSE error with connector information. * Use this in catchError handlers for streaming endpoints that call LLM connectors. */ export function createConnectorSSEError(error: Error, connector: InferenceConnector): Error { if (isInferenceProviderError(error)) { - const model = getConnectorModel(connector); - const cause = getErrorMessage(error.message, error.meta?.status); - - const lines = [ - `Connector: ${connector.name}`, - model ? `Model: ${model}` : null, - `Cause: ${cause}`, - ].filter(Boolean); - - return createSSEInternalError(lines.join('\n')); + return createSSEInternalError(formatInferenceProviderError(error, connector)); } // For non-provider errors, just return the message diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/classic_advanced_view.test.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/classic_advanced_view.test.tsx index 5463f8d72ebd5..f9ec7b1d166ce 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/classic_advanced_view.test.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/classic_advanced_view.test.tsx @@ -57,6 +57,13 @@ jest.mock('../../../../hooks/use_ai_features', () => ({ jest.mock('../../../../hooks/use_stream_features_api', () => ({ useStreamFeaturesApi: () => ({ identifyFeatures: jest.fn(), + getFeatureIdentificationTask: jest.fn().mockResolvedValue({ status: 'idle' }), + scheduleFeatureIdentificationTask: jest.fn(), + cancelFeatureIdentificationTask: jest.fn(), + acknowledgeFeatureIdentificationTask: jest.fn(), + addFeaturesToStream: jest.fn(), + removeFeaturesFromStream: jest.fn(), + upsertFeature: jest.fn(), abort: jest.fn(), }), })); @@ -215,8 +222,6 @@ describe('ClassicAdvancedView', () => { // Check the Feature identification panel title is rendered expect(screen.getByText('Feature identification')).toBeInTheDocument(); - // Check the Identify features button is rendered - expect(screen.getByRole('button', { name: /identify features/i })).toBeInTheDocument(); }); it('should NOT render Stream description or Feature identification when significantEvents is disabled', () => { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/wired_advanced_view.test.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/wired_advanced_view.test.tsx index 7d01898b7c628..736cf317b3a17 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/wired_advanced_view.test.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_management/advanced_view/wired_advanced_view.test.tsx @@ -57,6 +57,13 @@ jest.mock('../../../../hooks/use_ai_features', () => ({ jest.mock('../../../../hooks/use_stream_features_api', () => ({ useStreamFeaturesApi: () => ({ identifyFeatures: jest.fn(), + getFeatureIdentificationTask: jest.fn().mockResolvedValue({ status: 'idle' }), + scheduleFeatureIdentificationTask: jest.fn(), + cancelFeatureIdentificationTask: jest.fn(), + acknowledgeFeatureIdentificationTask: jest.fn(), + addFeaturesToStream: jest.fn(), + removeFeaturesFromStream: jest.fn(), + upsertFeature: jest.fn(), abort: jest.fn(), }), })); @@ -225,8 +232,6 @@ describe('WiredAdvancedView', () => { // Check the Feature identification panel title is rendered expect(screen.getByText('Feature identification')).toBeInTheDocument(); - // Check the Identify features button is rendered - expect(screen.getByRole('button', { name: /identify features/i })).toBeInTheDocument(); }); it('should NOT render Stream description or Feature identification when significantEvents is disabled', () => { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_feature_configuration.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_feature_configuration.tsx index f74b9c5dff3e8..058242f6ee365 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_feature_configuration.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_feature_configuration.tsx @@ -4,18 +4,23 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import React, { useState } from 'react'; +import React from 'react'; import { i18n } from '@kbn/i18n'; -import type { Streams, Feature } from '@kbn/streams-schema'; -import { EuiPanel, EuiText, EuiFlexGroup, EuiFlexItem, EuiSpacer } from '@elastic/eui'; +import type { Streams } from '@kbn/streams-schema'; +import { + EuiPanel, + EuiText, + EuiFlexGroup, + EuiFlexItem, + EuiSpacer, + EuiBadge, + EuiToolTip, +} from '@elastic/eui'; import { useStreamFeatures } from './stream_features/hooks/use_stream_features'; -import type { AIFeatures } from '../../hooks/use_ai_features'; -import { useStreamFeaturesApi } from '../../hooks/use_stream_features_api'; -import { StreamFeaturesFlyout } from './stream_features/stream_features_flyout'; import { StreamFeaturesAccordion } from './stream_features/stream_features_accordion'; import { Row } from '../data_management/stream_detail_management/advanced_view/row'; -import { ConnectorListButtonBase } from '../connector_list_button/connector_list_button'; -import { useKibana } from '../../hooks/use_kibana'; +import { FeatureIdentificationControl } from '../stream_detail_significant_events_view/feature_identification_control'; +import type { AIFeatures } from '../../hooks/use_ai_features'; interface StreamConfigurationProps { definition: Streams.all.Definition; @@ -23,30 +28,47 @@ interface StreamConfigurationProps { } export function StreamFeatureConfiguration({ definition, aiFeatures }: StreamConfigurationProps) { - const { - core: { notifications }, - } = useKibana(); - const [isFlyoutVisible, setIsFlyoutVisible] = useState(false); - const { identifyFeatures, abort } = useStreamFeaturesApi(definition); - const [features, setFeatures] = useState([]); const { features: existingFeatures, refreshFeatures, featuresLoading, } = useStreamFeatures(definition); - const [isLoading, setIsLoading] = useState(false); - return ( - -

- {i18n.translate('xpack.streams.streamDetailView.configurationTitle', { - defaultMessage: 'Feature identification', - })} -

-
+ + + +

+ {i18n.translate('xpack.streams.streamDetailView.configurationTitle', { + defaultMessage: 'Feature identification', + })} +

+
+
+ + + + {i18n.translate( + 'xpack.streams.streamFeatureConfiguration.backgroundTaskBadgeLabel', + { + defaultMessage: 'Background task', + } + )} + + + +
@@ -63,40 +85,9 @@ export function StreamFeatureConfiguration({ definition, aiFeatures }: StreamCon right={ - { - setIsLoading(true); - setIsFlyoutVisible(!isFlyoutVisible); - identifyFeatures(aiFeatures?.genAiConnectors.selectedConnector!) - .then((data) => { - setFeatures(data.features); - }) - .catch((error) => { - if (error.name === 'AbortError') { - return; - } - notifications.toasts.addError(error, { - title: i18n.translate( - 'xpack.streams.streamDetailView.featureIdentification.errorTitle', - { defaultMessage: 'Failed to identify features' } - ), - }); - }) - .finally(() => { - setIsLoading(false); - }); - }, - 'data-test-subj': 'feature_identification_identify_features_button', - children: i18n.translate( - 'xpack.streams.streamDetailView.featureIdentificationButtonLabel', - { - defaultMessage: 'Identify features', - } - ), - }} + @@ -115,19 +106,6 @@ export function StreamFeatureConfiguration({ definition, aiFeatures }: StreamCon /> )} - {isFlyoutVisible && ( - { - abort(); - refreshFeatures(); - setIsFlyoutVisible(false); - }} - setFeatures={setFeatures} - /> - )}
diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_features/stream_features_flyout.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_features/stream_features_flyout.tsx index 808d7418266e2..a853c8173c1e6 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_features/stream_features_flyout.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_features/stream_features/stream_features_flyout.tsx @@ -15,32 +15,35 @@ import { EuiFlyoutBody, EuiFlyoutFooter, EuiFlyoutHeader, - EuiSpacer, - EuiText, EuiTitle, - EuiLoadingElastic, } from '@elastic/eui'; import type { Streams, Feature } from '@kbn/streams-schema'; import { FormattedMessage } from '@kbn/i18n-react'; import { i18n } from '@kbn/i18n'; import { css } from '@emotion/css'; -import { useWaitingForAiMessage } from '../../../hooks/use_waiting_for_ai_message'; +import { useKibana } from '../../../hooks/use_kibana'; import { useStreamFeaturesApi } from '../../../hooks/use_stream_features_api'; import { StreamFeaturesTable } from './stream_features_table'; export const StreamFeaturesFlyout = ({ - features, - closeFlyout, - isLoading, definition, + features, setFeatures, + closeFlyout, + onFeaturesAdded, + onFeaturesDiscarded, }: { - isLoading: boolean; - features: Feature[]; - closeFlyout: () => void; definition: Streams.all.Definition; + features: Feature[]; setFeatures: React.Dispatch>; + closeFlyout: () => void; + onFeaturesAdded: () => void; + onFeaturesDiscarded: () => void; }) => { + const { + core: { notifications }, + } = useKibana(); + const [selectedFeatureNames, setSelectedFeatureNames] = useState>(new Set()); const { addFeaturesToStream } = useStreamFeaturesApi(definition); const [isUpdating, setIsUpdating] = useState(false); @@ -77,17 +80,13 @@ export const StreamFeaturesFlyout = ({ } `} > - {!isLoading ? ( - - ) : ( - - )} + @@ -111,60 +110,52 @@ export const StreamFeaturesFlyout = ({ - { - setIsUpdating(true); - addFeaturesToStream(selectedFeatures).finally(() => { - closeFlyout(); - setIsUpdating(false); - }); - }} - fill - isDisabled={selectedFeatureNames.size === 0} - > - - + + + { + onFeaturesDiscarded(); + }} + > + + + + + { + setIsUpdating(true); + addFeaturesToStream(selectedFeatures).finally(() => { + notifications.toasts.addSuccess({ + title: i18n.translate( + 'xpack.streams.streamFeaturesFlyout.addFeaturesSuccessToastTitle', + { + defaultMessage: + '{count} {count, plural, one {feature} other {features}} added to stream', + values: { count: selectedFeatures.length }, + } + ), + }); + onFeaturesAdded(); + setIsUpdating(false); + }); + }} + fill + isDisabled={selectedFeatureNames.size === 0} + > + + + + ); }; - -function LoadingState({ closeFlyout }: { closeFlyout: () => void }) { - const label = useWaitingForAiMessage(); - - return ( - - - - - {label} - - - - - - - - ); -} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx index 48f2cb4ec897b..d87775b3a6999 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx @@ -57,7 +57,7 @@ interface Props { query?: StreamQueryKql; initialFlow?: Flow; initialSelectedFeatures: Feature[]; - onFeatureIdentificationClick: () => void; + refreshFeatures: () => void; generateOnMount: boolean; aiFeatures: AIFeatures | null; } @@ -72,7 +72,7 @@ export function AddSignificantEventFlyout({ initialFlow = undefined, initialSelectedFeatures, features, - onFeatureIdentificationClick, + refreshFeatures, aiFeatures, }: Props) { const { euiTheme } = useEuiTheme(); @@ -305,7 +305,8 @@ export function AddSignificantEventFlyout({ selectedFeatures={selectedFeatures} onFeaturesChange={setSelectedFeatures} onGenerateSuggestionsClick={generateQueries} - onFeatureIdentificationClick={onFeatureIdentificationClick} + definition={definition.stream} + refreshFeatures={refreshFeatures} isGeneratingQueries={isGenerating} isSavingManualEntry={isSubmitting} selectedFlow={selectedFlow} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx index 4eab58d220724..694368a6d884d 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx @@ -27,7 +27,7 @@ export const EditSignificantEventFlyout = ({ setQueryToEdit, features, refresh, - onFeatureIdentificationClick, + refreshFeatures, generateOnMount, aiFeatures, }: { @@ -42,7 +42,7 @@ export const EditSignificantEventFlyout = ({ definition: Streams.all.GetResponse; isEditFlyoutOpen: boolean; setIsEditFlyoutOpen: React.Dispatch>; - onFeatureIdentificationClick: () => void; + refreshFeatures: () => void; generateOnMount: boolean; aiFeatures: AIFeatures | null; }) => { @@ -65,7 +65,7 @@ export const EditSignificantEventFlyout = ({ void; + definition: Streams.all.Definition; + refreshFeatures: () => void; onManualEntryClick: () => void; onGenerateSuggestionsClick: () => void; aiFeatures: AIFeatures | null; @@ -53,7 +56,8 @@ export function EmptyState({ selectedFeatures={selectedFeatures} onFeaturesChange={onFeaturesChange} onGenerateSuggestionsClick={onGenerateSuggestionsClick} - onFeatureIdentificationClick={onFeatureIdentificationClick} + definition={definition} + refreshFeatures={refreshFeatures} onManualEntryClick={onManualEntryClick} isGeneratingQueries={false} isSavingManualEntry={false} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/feature_identification_control.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/feature_identification_control.tsx new file mode 100644 index 0000000000000..5a23d1c499122 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/feature_identification_control.tsx @@ -0,0 +1,278 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Feature, Streams } from '@kbn/streams-schema'; +import React, { useEffect, useState } from 'react'; +import useAsyncFn from 'react-use/lib/useAsyncFn'; +import { EuiButton, EuiButtonEmpty, EuiCallOut, EuiFlexGroup, EuiFlexItem } from '@elastic/eui'; +import { i18n } from '@kbn/i18n'; +import type { AIFeatures } from '../../hooks/use_ai_features'; +import { useStreamFeaturesApi } from '../../hooks/use_stream_features_api'; +import { useTaskPolling } from '../../hooks/use_task_polling'; +import { StreamFeaturesFlyout } from '../stream_detail_features/stream_features/stream_features_flyout'; +import { ConnectorListButton } from '../connector_list_button/connector_list_button'; + +interface FeatureIdentificationControlProps { + definition: Streams.all.Definition; + refreshFeatures: () => void; + aiFeatures: AIFeatures | null; + disabled?: boolean; +} + +export function FeatureIdentificationControl({ + definition, + refreshFeatures, + aiFeatures, + disabled = false, +}: FeatureIdentificationControlProps) { + const [isFlyoutVisible, setIsFlyoutVisible] = useState(false); + + const [features, setFeatures] = useState([]); + const [isLoading, setIsLoading] = useState(false); + + const { + getFeatureIdentificationTask, + scheduleFeatureIdentificationTask, + cancelFeatureIdentificationTask, + acknowledgeFeatureIdentificationTask, + } = useStreamFeaturesApi(definition); + + const [{ loading, value: task, error }, getTask] = useAsyncFn(getFeatureIdentificationTask); + useEffect(() => { + getTask(); + }, [getTask]); + useTaskPolling(task, getFeatureIdentificationTask, getTask); + + const flyout = isFlyoutVisible && ( + { + setIsFlyoutVisible(false); + }} + onFeaturesAdded={() => { + setIsFlyoutVisible(false); + acknowledgeFeatureIdentificationTask().then(getTask).then(refreshFeatures); + }} + onFeaturesDiscarded={() => { + setIsFlyoutVisible(false); + acknowledgeFeatureIdentificationTask().then(getTask); + }} + /> + ); + + if (error) { + return ( + + {error.message} + + ); + } + + if (task === undefined) { + return null; + } + + const triggerButton = ( + { + setIsLoading(true); + scheduleFeatureIdentificationTask(aiFeatures?.genAiConnectors.selectedConnector!).then( + () => { + setIsLoading(false); + getTask(); + } + ); + }, + 'data-test-subj': 'feature_identification_identify_features_button', + children: i18n.translate( + 'xpack.streams.streamDetailView.featureIdentificationButtonLabel', + { + defaultMessage: 'Identify features', + } + ), + }} + /> + ); + + if ( + task.status === 'not_started' || + task.status === 'acknowledged' || + task.status === 'canceled' + ) { + return triggerButton; + } + + if (task.status === 'in_progress') { + return ( + + + + {i18n.translate( + 'xpack.streams.streamDetailView.featureIdentificationButtonInProgressLabel', + { + defaultMessage: 'Feature identification in progress', + } + )} + + + + { + cancelFeatureIdentificationTask().then(() => { + getTask(); + }); + }} + > + {i18n.translate( + 'xpack.streams.streamDetailView.cancelFeatureIdentificationButtonLabel', + { + defaultMessage: 'Cancel', + } + )} + + + + ); + } + + if (task.status === 'being_canceled') { + return ( + + ); + } + + if (task.status === 'completed') { + if (task.features.length === 0) { + return ( + + {triggerButton} + + { + acknowledgeFeatureIdentificationTask().then(getTask); + }} + > + {i18n.translate('xpack.streams.streamDetailView.noFeaturesIdentifiedDescription', { + defaultMessage: + "The feature identification task didn't find any new features in your data. You can try again with different AI connector settings or try later with new data ingested.", + })} + + + + ); + } + + return ( + <> + { + setFeatures(task.features); + setIsFlyoutVisible(true); + }} + data-test-subj="feature_identification_review_features_button" + > + {i18n.translate('xpack.streams.streamDetailView.reviewIdentifiedFeaturesButtonLabel', { + defaultMessage: + 'Review {count} identified {count, plural, one {feature} other {features}}', + values: { count: task.features.length }, + })} + + {flyout} + + ); + } + + if (task.status === 'failed') { + return ( + + {triggerButton} + + + {task.error} + + + + ); + } + + if (task.status === 'stale') { + return ( + + {triggerButton} + + + {i18n.translate( + 'xpack.streams.streamDetailView.featureIdentificationTaskStaledDescription', + { + defaultMessage: + "The feature identification task didn't report its status for a prolonged period and is considered stale. Please start a new task.", + } + )} + + + + ); + } +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/generation_panel.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/generation_panel.tsx index d228bcdb3d2b6..82bebbed15b20 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/generation_panel.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/generation_panel.tsx @@ -16,7 +16,7 @@ import { EuiText, EuiTitle, } from '@elastic/eui'; -import type { Feature } from '@kbn/streams-schema'; +import type { Feature, Streams } from '@kbn/streams-schema'; import { i18n } from '@kbn/i18n'; import type { FeatureSelectorProps } from './feature_selector'; import { FeaturesSelector } from './feature_selector'; @@ -24,20 +24,23 @@ import { AssetImage } from '../asset_image'; import { ConnectorListButtonBase } from '../connector_list_button/connector_list_button'; import type { Flow } from './add_significant_event_flyout/types'; import type { AIFeatures } from '../../hooks/use_ai_features'; +import { FeatureIdentificationControl } from './feature_identification_control'; export function SignificantEventsGenerationPanel({ features, selectedFeatures, onFeaturesChange, + definition, + refreshFeatures, onGenerateSuggestionsClick, - onFeatureIdentificationClick, onManualEntryClick, isGeneratingQueries, isSavingManualEntry, selectedFlow, aiFeatures, }: FeatureSelectorProps & { - onFeatureIdentificationClick: () => void; + definition: Streams.all.Definition; + refreshFeatures: () => void; onManualEntryClick: () => void; onGenerateSuggestionsClick: (features: Feature[]) => void; isGeneratingQueries: boolean; @@ -55,10 +58,11 @@ export function SignificantEventsGenerationPanel({ {features.length === 0 ? ( ) : ( void; + definition: Streams.all.Definition; + refreshFeatures: () => void; + aiFeatures: AIFeatures | null; isGeneratingQueries: boolean; isSavingManualEntry: boolean; - aiFeatures: AIFeatures | null; }) { return ( <> @@ -269,7 +275,7 @@ function IdentifyFeatures({ {i18n.translate( - 'xpack.streams.significantEvents.significantEventsGenerationPanel.indentifyFeaturesDescription', + 'xpack.streams.significantEvents.significantEventsGenerationPanel.identifyFeaturesDescription', { defaultMessage: 'Features are logical subsets of the data and they provide the best context for the generation of significant events. Identify features first. Generation uses the last 24 hours of data.', @@ -285,21 +291,12 @@ function IdentifyFeatures({ - - identifyFeatures(), - 'data-test-subj': 'significant_events_identify_features_button', - isDisabled: isGeneratingQueries || isSavingManualEntry, - children: i18n.translate( - 'xpack.streams.significantEvents.significantEventsGenerationPanel.identifyFeaturesButtonLabel', - { - defaultMessage: 'Identify features', - } - ), - }} + + diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx index ede1bcdee73a1..cf68c4e808bc9 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx @@ -10,8 +10,7 @@ import { i18n } from '@kbn/i18n'; import type { Streams, StreamQueryKql, Feature } from '@kbn/streams-schema'; import type { TimeRange } from '@kbn/es-query'; import { compact, isEqual } from 'lodash'; -import React, { useCallback, useEffect, useMemo, useState } from 'react'; -import { StreamFeaturesFlyout } from '../stream_detail_features/stream_features/stream_features_flyout'; +import React, { useEffect, useMemo, useState } from 'react'; import { useStreamFeatures } from '../stream_detail_features/stream_features/hooks/use_stream_features'; import { useKibana } from '../../hooks/use_kibana'; import { EditSignificantEventFlyout } from './edit_significant_event_flyout'; @@ -22,14 +21,13 @@ import { LoadingPanel } from '../loading_panel'; import type { Flow } from './add_significant_event_flyout/types'; import { SignificantEventsTable } from './significant_events_table'; import { EmptyState } from './empty_state'; -import { useAIFeatures } from '../../hooks/use_ai_features'; import { OPEN_SIGNIFICANT_EVENTS_FLYOUT_URL_PARAM, SELECTED_FEATURES_URL_PARAM, } from '../../constants'; -import { useStreamFeaturesApi } from '../../hooks/use_stream_features_api'; import { SignificantEventsHistogramChart } from './significant_events_histogram'; import { formatChangePoint } from './utils/change_point'; +import { useAIFeatures } from '../../hooks/use_ai_features'; interface Props { definition: Streams.all.GetResponse; @@ -39,13 +37,11 @@ interface Props { export function StreamDetailSignificantEventsView({ definition, refreshDefinition }: Props) { const { timeState, setTime, refresh } = useTimefilter(); const { - core: { notifications }, dependencies: { start: { unifiedSearch }, }, } = useKibana(); const { euiTheme } = useEuiTheme(); - const aiFeatures = useAIFeatures(); const xFormatter = useMemo(() => { @@ -53,10 +49,6 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio }, [timeState.start, timeState.end]); const { features, refreshFeatures, featuresLoading } = useStreamFeatures(definition.stream); - const { identifyFeatures, abort } = useStreamFeaturesApi(definition.stream); - const [isFeatureDetectionFlyoutOpen, setIsFeatureDetectionFlyoutOpen] = useState(false); - const [isFeatureDetectionLoading, setIsFeatureDetectionLoading] = useState(false); - const [detectedFeatures, setDetectedFeatures] = useState([]); const [query, setQuery] = useState(''); const significantEventsFetchState = useFetchSignificantEvents({ @@ -74,36 +66,6 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio const [queryToEdit, setQueryToEdit] = useState(); const [dateRange, setDateRange] = useState(timeState.timeRange); - const identifyFeaturesCallback = useCallback(() => { - setIsFeatureDetectionLoading(true); - setIsFeatureDetectionFlyoutOpen(true); - - identifyFeatures(aiFeatures?.genAiConnectors.selectedConnector!) - .then((data) => { - setDetectedFeatures(data.features); - }) - .catch((error) => { - if (error.name === 'AbortError') { - return; - } - notifications.toasts.addError(error, { - title: i18n.translate('xpack.streams.streamDetailView.featureIdentification.errorTitle', { - defaultMessage: 'Failed to identify features', - }), - }); - }) - .finally(() => { - setIsFeatureDetectionLoading(false); - }); - }, [ - identifyFeatures, - aiFeatures?.genAiConnectors.selectedConnector, - setIsFeatureDetectionLoading, - setIsFeatureDetectionFlyoutOpen, - setDetectedFeatures, - notifications.toasts, - ]); - useEffect(() => { const urlParams = new URLSearchParams(window.location.search); if (urlParams.get(OPEN_SIGNIFICANT_EVENTS_FLYOUT_URL_PARAM) === 'true' && features.length > 0) { @@ -134,20 +96,6 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio return ; } - const featureDetectionFlyout = isFeatureDetectionFlyoutOpen ? ( - { - abort(); - refreshFeatures(); - setIsFeatureDetectionFlyoutOpen(false); - }} - setFeatures={setDetectedFeatures} - /> - ) : null; - const editFlyout = (generateOnMount: boolean) => ( @@ -180,7 +128,8 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio features={features} selectedFeatures={selectedFeatures} onFeaturesChange={setSelectedFeatures} - onFeatureIdentificationClick={identifyFeaturesCallback} + definition={definition.stream} + refreshFeatures={refreshFeatures} onManualEntryClick={() => { setQueryToEdit(undefined); setInitialFlow('manual'); @@ -192,7 +141,6 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio }} aiFeatures={aiFeatures} /> - {featureDetectionFlyout} {editFlyout(true)} ); @@ -305,7 +253,6 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio /> - {featureDetectionFlyout} {editFlyout(false)} ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_features_api.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_features_api.ts index 8ae72e373f0aa..cadbe38a4d6a7 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_features_api.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_stream_features_api.ts @@ -6,22 +6,22 @@ */ import { useAbortController } from '@kbn/react-hooks'; -import { firstValueFrom } from 'rxjs'; import type { Streams, Feature, FeatureType } from '@kbn/streams-schema'; -import type { IdentifiedFeaturesEvent } from '@kbn/streams-plugin/server/routes/internal/streams/features/types'; import type { StorageClientBulkResponse } from '@kbn/storage-adapter'; +import type { FeatureIdentificationTaskResult } from '@kbn/streams-plugin/server/routes/internal/streams/features/route'; import { useKibana } from './use_kibana'; import { getStreamTypeFromDefinition } from '../util/get_stream_type_from_definition'; -import { getLast24HoursTimeRange } from '../util/time_range'; interface StreamFeaturesApi { - upsertFeature: (feature: Feature) => Promise; - identifyFeatures: (connectorId: string) => Promise; + getFeatureIdentificationTask: () => Promise; + scheduleFeatureIdentificationTask: (connectorId: string) => Promise; + cancelFeatureIdentificationTask: () => Promise; + acknowledgeFeatureIdentificationTask: () => Promise; addFeaturesToStream: (features: Feature[]) => Promise; removeFeaturesFromStream: ( features: Pick[] ) => Promise; - abort: () => void; + upsertFeature: (feature: Feature) => Promise; } export function useStreamFeaturesApi(definition: Streams.all.Definition): StreamFeaturesApi { @@ -34,46 +34,67 @@ export function useStreamFeaturesApi(definition: Streams.all.Definition): Stream services: { telemetryClient }, } = useKibana(); - const { signal, abort, refresh } = useAbortController(); + const { signal } = useAbortController(); return { - identifyFeatures: async (connectorId: string) => { - const { from, to } = getLast24HoursTimeRange(); - const events$ = streamsRepositoryClient.stream( + getFeatureIdentificationTask: async () => { + return await streamsRepositoryClient.fetch( 'POST /internal/streams/{name}/features/_identify', { signal, params: { path: { name: definition.name }, query: { - connectorId, - from, - to, + connectorId: '', + to: '', + from: '', }, }, } ); - - const identifiedFeatures = await firstValueFrom(events$); - - telemetryClient.trackFeaturesIdentified({ - count: identifiedFeatures.features.length, - count_by_type: identifiedFeatures.features.reduce>( - (acc, feature) => { - acc[feature.type] = (acc[feature.type] || 0) + 1; - return acc; + }, + scheduleFeatureIdentificationTask: async (connectorId: string) => { + const now = Date.now(); + await streamsRepositoryClient.fetch('POST /internal/streams/{name}/features/_identify', { + signal, + params: { + path: { name: definition.name }, + query: { + schedule: true, + connectorId, + to: new Date(now).toISOString(), + from: new Date(now - 24 * 60 * 60 * 1000).toISOString(), }, - { - system: 0, - } - ), - stream_name: definition.name, - stream_type: getStreamTypeFromDefinition(definition), - input_tokens_used: identifiedFeatures.tokensUsed.prompt, - output_tokens_used: identifiedFeatures.tokensUsed.completion, + }, + }); + }, + cancelFeatureIdentificationTask: async () => { + await streamsRepositoryClient.fetch('POST /internal/streams/{name}/features/_identify', { + signal, + params: { + path: { name: definition.name }, + query: { + cancel: true, + connectorId: '', + to: '', + from: '', + }, + }, + }); + }, + acknowledgeFeatureIdentificationTask: async () => { + await streamsRepositoryClient.fetch('POST /internal/streams/{name}/features/_identify', { + signal, + params: { + path: { name: definition.name }, + query: { + acknowledge: true, + connectorId: '', + to: '', + from: '', + }, + }, }); - - return identifiedFeatures; }, addFeaturesToStream: async (features: Feature[]) => { telemetryClient.trackFeaturesSaved({ @@ -158,9 +179,5 @@ export function useStreamFeaturesApi(definition: Streams.all.Definition): Stream } ); }, - abort: () => { - abort(); - refresh(); - }, }; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_task_polling.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_task_polling.ts new file mode 100644 index 0000000000000..9df1af40fb364 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_task_polling.ts @@ -0,0 +1,46 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { useEffect } from 'react'; + +interface TaskWithStatus { + status: string; +} + +export function useTaskPolling( + task: TaskWithStatus | undefined, + poll: () => Promise, + refresh: () => void +) { + useEffect(() => { + if (task?.status !== 'in_progress' && task?.status !== 'being_canceled') { + return; + } + + const startTime = Date.now(); + const maxDuration = 5 * 60 * 1000; + const pollInterval = 2000; + + const intervalId = setInterval(async () => { + if (Date.now() - startTime > maxDuration) { + clearInterval(intervalId); + return; + } + + const polledTask = await poll(); + + if (polledTask.status !== 'in_progress' && polledTask.status !== 'being_canceled') { + clearInterval(intervalId); + refresh(); + } + }, pollInterval); + + return () => { + clearInterval(intervalId); + }; + }, [task?.status, poll, refresh]); +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/client.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/client.ts index 4d673d753ce83..9e8938c0b8d13 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/client.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/client.ts @@ -25,7 +25,6 @@ import type { StreamsSignificantEventsSuggestionsGeneratedEventProps, WiredStreamsStatusChangedProps, StreamsFeatureIdentificationSavedProps, - StreamsFeatureIdentificationIdentifiedProps, StreamsFeatureIdentificationDeletedProps, StreamsDescriptionGeneratedProps, StreamsProcessingSimulationSamplesFetchLatencyProps, @@ -50,7 +49,6 @@ import { STREAMS_SIGNIFICANT_EVENTS_CREATED_EVENT_TYPE, STREAMS_SIGNIFICANT_EVENTS_SUGGESTIONS_GENERATED_EVENT_TYPE, STREAMS_WIRED_STREAMS_STATUS_CHANGED_EVENT_TYPE, - STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_DELETED_EVENT_TYPE, STREAMS_DESCRIPTION_GENERATED_EVENT_TYPE, @@ -128,10 +126,6 @@ export class StreamsTelemetryClient { this.analytics.reportEvent(STREAMS_SIGNIFICANT_EVENTS_CREATED_EVENT_TYPE, params); } - public trackFeaturesIdentified(params: StreamsFeatureIdentificationIdentifiedProps) { - this.analytics.reportEvent(STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, params); - } - public trackFeaturesSaved(params: StreamsFeatureIdentificationSavedProps) { this.analytics.reportEvent(STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE, params); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/constants.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/constants.ts index df7375542abe3..e99baae229de7 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/constants.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/constants.ts @@ -23,8 +23,6 @@ const STREAMS_SCHEMA_UPDATED_EVENT_TYPE = 'streams-schema-updated'; const STREAMS_SIGNIFICANT_EVENTS_SUGGESTIONS_GENERATED_EVENT_TYPE = 'streams-significant-events-suggestions-generated'; const STREAMS_SIGNIFICANT_EVENTS_CREATED_EVENT_TYPE = 'streams-significant-events-created'; -const STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE = - 'streams-feature-identification-identified'; const STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE = 'streams-feature-identification-saved'; const STREAMS_FEATURE_IDENTIFICATION_DELETED_EVENT_TYPE = 'streams-feature-identification-deleted'; const STREAMS_DESCRIPTION_GENERATED_EVENT_TYPE = 'streams-description-generated'; @@ -52,7 +50,6 @@ export { STREAMS_SIGNIFICANT_EVENTS_SUGGESTIONS_GENERATED_EVENT_TYPE, STREAMS_SIGNIFICANT_EVENTS_CREATED_EVENT_TYPE, STREAMS_WIRED_STREAMS_STATUS_CHANGED_EVENT_TYPE, - STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_DELETED_EVENT_TYPE, STREAMS_DESCRIPTION_GENERATED_EVENT_TYPE, diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/events.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/events.ts index 747883396a734..a7abbc3c52ecf 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/events.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/events.ts @@ -21,7 +21,6 @@ import { STREAMS_SIGNIFICANT_EVENTS_CREATED_EVENT_TYPE, STREAMS_SIGNIFICANT_EVENTS_SUGGESTIONS_GENERATED_EVENT_TYPE, STREAMS_WIRED_STREAMS_STATUS_CHANGED_EVENT_TYPE, - STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE, STREAMS_FEATURE_IDENTIFICATION_DELETED_EVENT_TYPE, STREAMS_DESCRIPTION_GENERATED_EVENT_TYPE, @@ -42,7 +41,6 @@ import { streamsSignificantEventsCreatedSchema, streamsSignificantEventsSuggestionsGeneratedSchema, wiredStreamsStatusChangedSchema, - streamsFeatureIdentificationIdentifiedSchema, streamsFeatureIdentificationSavedSchema, streamsFeatureIdentificationDeletedSchema, streamsDescriptionGeneratedSchema, @@ -119,11 +117,6 @@ const streamsSignificantEventsCreatedEventType = { schema: streamsSignificantEventsCreatedSchema, }; -const streamsFeatureIdentificationIdentifiedEventType = { - eventType: STREAMS_FEATURE_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, - schema: streamsFeatureIdentificationIdentifiedSchema, -}; - const streamsFeatureIdentificationSavedEventType = { eventType: STREAMS_FEATURE_IDENTIFICATION_SAVED_EVENT_TYPE, schema: streamsFeatureIdentificationSavedSchema, @@ -165,7 +158,6 @@ export { streamsSignificantEventsSuggestionsGeneratedEventType, streamsSignificantEventsCreatedEventType, wiredStreamsStatusChangedEventType, - streamsFeatureIdentificationIdentifiedEventType, streamsFeatureIdentificationSavedEventType, streamsFeatureIdentificationDeletedEventType, streamsDescriptionGeneratedEventType, diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/schemas.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/schemas.ts index 59e6e218d7aa4..3b8dfc1069130 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/schemas.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/schemas.ts @@ -23,7 +23,6 @@ import type { StreamsSignificantEventsCreatedProps, StreamsSignificantEventsSuggestionsGeneratedEventProps, WiredStreamsStatusChangedProps, - StreamsFeatureIdentificationIdentifiedProps, StreamsFeatureIdentificationSavedProps, StreamsFeatureIdentificationDeletedProps, StreamsDescriptionGeneratedProps, @@ -385,41 +384,6 @@ const streamsSignificantEventsCreatedSchema: RootSchema = - { - count: { - type: 'long', - _meta: { - description: 'The number of features identified', - }, - }, - count_by_type: countByTypes, - input_tokens_used: { - type: 'long', - _meta: { - description: 'The number of input tokens used for the generation request', - }, - }, - output_tokens_used: { - type: 'long', - _meta: { - description: 'The number of output tokens used for the generation request', - }, - }, - stream_type: { - type: 'keyword', - _meta: { - description: 'The type of the stream: wired or classic', - }, - }, - stream_name: { - type: 'keyword', - _meta: { - description: 'The name of the Stream', - }, - }, - }; - const streamsFeatureIdentificationSavedSchema: RootSchema = { count: { @@ -581,7 +545,6 @@ export { streamsSignificantEventsSuggestionsGeneratedSchema, streamsSignificantEventsCreatedSchema, wiredStreamsStatusChangedSchema, - streamsFeatureIdentificationIdentifiedSchema, streamsFeatureIdentificationSavedSchema, streamsFeatureIdentificationDeletedSchema, streamsDescriptionGeneratedSchema, diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/service.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/service.ts index 05b8f24ed37d5..24a860ad37a15 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/service.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/service.ts @@ -22,7 +22,6 @@ import { streamsSignificantEventsCreatedEventType, streamsSignificantEventsSuggestionsGeneratedEventType, wiredStreamsStatusChangedEventType, - streamsFeatureIdentificationIdentifiedEventType, streamsFeatureIdentificationSavedEventType, streamsFeatureIdentificationDeletedEventType, streamsDescriptionGeneratedEventType, @@ -52,7 +51,6 @@ export class StreamsTelemetryService { this.analytics.registerEventType(streamsSignificantEventsSuggestionsGeneratedEventType); this.analytics.registerEventType(streamsSignificantEventsCreatedEventType); this.analytics.registerEventType(wiredStreamsStatusChangedEventType); - this.analytics.registerEventType(streamsFeatureIdentificationIdentifiedEventType); this.analytics.registerEventType(streamsFeatureIdentificationSavedEventType); this.analytics.registerEventType(streamsFeatureIdentificationDeletedEventType); this.analytics.registerEventType(streamsDescriptionGeneratedEventType); diff --git a/x-pack/platform/plugins/shared/streams_app/public/telemetry/types.ts b/x-pack/platform/plugins/shared/streams_app/public/telemetry/types.ts index c11857e20858d..cd47187691174 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/telemetry/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/telemetry/types.ts @@ -123,15 +123,6 @@ interface StreamsSignificantEventsCreatedProps { stream_type: StreamType; } -interface StreamsFeatureIdentificationIdentifiedProps { - count: number; - count_by_type: CountByFeatureType; - input_tokens_used: number; - output_tokens_used: number; - stream_name: string; - stream_type: StreamType; -} - interface StreamsFeatureIdentificationSavedProps { count: number; count_by_type: CountByFeatureType; @@ -202,7 +193,6 @@ export { type StreamsSignificantEventsCreatedProps, type WiredStreamsStatusChangedProps, type StreamsFeatureIdentificationSavedProps, - type StreamsFeatureIdentificationIdentifiedProps, type StreamsFeatureIdentificationDeletedProps, type StreamsDescriptionGeneratedProps, type StreamsProcessingSimulationSamplesFetchLatencyProps, diff --git a/x-pack/platform/plugins/shared/streams_app/public/util/get_stream_type_from_definition.ts b/x-pack/platform/plugins/shared/streams_app/public/util/get_stream_type_from_definition.ts index 5e1abf787a018..007dd5e521195 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/util/get_stream_type_from_definition.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/util/get_stream_type_from_definition.ts @@ -5,18 +5,4 @@ * 2.0. */ -import { Streams } from '@kbn/streams-schema'; - -export function getStreamTypeFromDefinition( - definition: Streams.all.Definition -): 'wired' | 'classic' | 'unknown' { - if (Streams.WiredStream.Definition.is(definition)) { - return 'wired'; - } - - if (Streams.ClassicStream.Definition.is(definition)) { - return 'classic'; - } - - return 'unknown'; -} +export { getStreamTypeFromDefinition } from '@kbn/streams-schema'; diff --git a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index 98774cedb1d54..57b13d23e6694 100644 --- a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -226,6 +226,7 @@ export default function ({ getService }: FtrProviderContext) { 'session_cleanup', 'slo:bulk-delete-task', 'slo:temp-summary-cleanup-task', + 'streams_feature_identification', 'task_manager:delete_inactive_background_task_nodes', 'task_manager:invalidate_api_keys', 'task_manager:mark_removed_tasks_as_unrecognized',