diff --git a/x-pack/platform/packages/shared/kbn-evals-suite-streams/moon.yml b/x-pack/platform/packages/shared/kbn-evals-suite-streams/moon.yml index 1314a096e888b..7145548e6f9b0 100644 --- a/x-pack/platform/packages/shared/kbn-evals-suite-streams/moon.yml +++ b/x-pack/platform/packages/shared/kbn-evals-suite-streams/moon.yml @@ -19,7 +19,6 @@ project: sourceRoot: x-pack/platform/packages/shared/kbn-evals-suite-streams dependsOn: - '@kbn/evals' - - '@kbn/sse-utils-client' - '@kbn/object-utils' - '@kbn/grok-heuristics' - '@kbn/tooling-log' @@ -35,6 +34,7 @@ dependsOn: - '@kbn/dev-cli-runner' - '@kbn/core' - '@kbn/dissect-heuristics' + - '@kbn/sse-utils-client' tags: - functional-tests - package diff --git a/x-pack/platform/packages/shared/kbn-evals-suite-streams/tsconfig.json b/x-pack/platform/packages/shared/kbn-evals-suite-streams/tsconfig.json index ec43d391caba9..2aff215f802f9 100644 --- a/x-pack/platform/packages/shared/kbn-evals-suite-streams/tsconfig.json +++ b/x-pack/platform/packages/shared/kbn-evals-suite-streams/tsconfig.json @@ -8,7 +8,6 @@ "exclude": ["target/**/*"], "kbn_references": [ "@kbn/evals", - "@kbn/sse-utils-client", "@kbn/object-utils", "@kbn/grok-heuristics", "@kbn/tooling-log", @@ -24,5 +23,6 @@ "@kbn/dev-cli-runner", "@kbn/core", "@kbn/dissect-heuristics", + "@kbn/sse-utils-client", ] } diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts index b88a6b5058a46..9bb1a2d05644e 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts @@ -134,6 +134,8 @@ export type { SignificantEventsPreviewResponse, SignificantEventsGenerateResponse, GeneratedSignificantEventQuery, + SignificantEventsQueriesGenerationResult, + SignificantEventsQueriesGenerationTaskResult, } from './src/api/significant_events'; export { emptyAssets } from './src/helpers/empty_assets'; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/api/significant_events/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/api/significant_events/index.ts index bdac78d65fee7..23858307f45c7 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/api/significant_events/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/api/significant_events/index.ts @@ -10,6 +10,7 @@ import type { ServerSentEventBase } from '@kbn/sse-utils'; import type { Condition } from '@kbn/streamlang'; import type { ChatCompletionTokenCount } from '@kbn/inference-common'; import type { StreamQueryKql } from '../../queries'; +import type { TaskStatus } from '../../tasks/types'; /** * SignificantEvents Get Response @@ -71,10 +72,34 @@ type SignificantEventsGenerateResponse = Observable< > >; +interface SignificantEventsQueriesGenerationResult { + queries: GeneratedSignificantEventQuery[]; + tokensUsed: Pick; +} + +type SignificantEventsQueriesGenerationTaskResult = + | { + status: + | TaskStatus.NotStarted + | TaskStatus.InProgress + | TaskStatus.Stale + | TaskStatus.BeingCanceled + | TaskStatus.Canceled; + } + | { + status: TaskStatus.Failed; + error: string; + } + | ({ + status: TaskStatus.Completed | TaskStatus.Acknowledged; + } & SignificantEventsQueriesGenerationResult); + export type { SignificantEventsResponse, SignificantEventsGetResponse, SignificantEventsPreviewResponse, GeneratedSignificantEventQuery, SignificantEventsGenerateResponse, + SignificantEventsQueriesGenerationResult, + SignificantEventsQueriesGenerationTaskResult, }; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts index 4a6e4bb3ead60..8361de38db2fa 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/cancellable_task.ts @@ -33,7 +33,7 @@ export function cancellableTask( taskContext.logger.trace( `Cancellable task check loop for task ${runContext.taskInstance.id}: status is ${task.status}` ); - if (task.status === 'being_canceled') { + if (task.status === TaskStatus.BeingCanceled) { runContext.abortController.abort(); await taskClient.update({ ...task, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts index 6547db804f99b..69f05ccc0a91a 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/index.ts @@ -9,6 +9,7 @@ import type { Logger } from '@kbn/core/server'; import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; import type { GetScopedClients } from '../../../routes/types'; import { createStreamsSystemIdentificationTask } from './system_identification'; +import { createStreamsSignificantEventsQueriesGenerationTask } from './significant_events_queries_generation'; import type { EbtTelemetryClient } from '../../telemetry'; export interface TaskContext { @@ -20,6 +21,7 @@ export interface TaskContext { export function createTaskDefinitions(taskContext: TaskContext) { return { ...createStreamsSystemIdentificationTask(taskContext), + ...createStreamsSignificantEventsQueriesGenerationTask(taskContext), } satisfies TaskDefinitionRegistry; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/significant_events_queries_generation.ts b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/significant_events_queries_generation.ts new file mode 100644 index 0000000000000..2987539a6e4f3 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/tasks/task_definitions/significant_events_queries_generation.ts @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server'; +import { isInferenceProviderError } from '@kbn/inference-common'; +import { + TaskStatus, + getStreamTypeFromDefinition, + type SignificantEventsQueriesGenerationResult, + type System, +} from '@kbn/streams-schema'; +import pLimit from 'p-limit'; +import { formatInferenceProviderError } from '../../../routes/utils/create_connector_sse_error'; +import type { TaskContext } from '.'; +import type { TaskParams } from '../types'; +import { PromptsConfigService } from '../../saved_objects/significant_events/prompts_config_service'; +import { cancellableTask } from '../cancellable_task'; +import { generateSignificantEventDefinitions } from '../../significant_events/generate_significant_events'; + +export interface SignificantEventsQueriesGenerationTaskParams { + connectorId: string; + start: number; + end: number; + systems?: System[]; + sampleDocsSize?: number; +} + +export const SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE = + 'streams_significant_events_queries_generation'; + +export function createStreamsSignificantEventsQueriesGenerationTask(taskContext: TaskContext) { + return { + [SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE]: { + createTaskRunner: (runContext) => { + return { + run: cancellableTask( + async () => { + if (!runContext.fakeRequest) { + throw new Error('Request is required to run this task'); + } + + const { connectorId, start, end, systems, sampleDocsSize, _task } = runContext + .taskInstance.params as TaskParams; + const { stream: name } = _task; + + const { taskClient, scopedClusterClient, streamsClient, inferenceClient, soClient } = + await taskContext.getScopedClients({ + request: runContext.fakeRequest, + }); + + try { + const stream = await streamsClient.getStream(name); + + const esClient = scopedClusterClient.asCurrentUser; + + const promptsConfigService = new PromptsConfigService({ + soClient, + logger: taskContext.logger, + }); + + const { significantEventsPromptOverride } = await promptsConfigService.getPrompt(); + + // If no systems are passed, generate for all data + // If systems are passed, generate for each system with concurrency limit + const systemsToProcess: Array = + systems && systems.length > 0 ? systems : [undefined]; + + // Process systems with concurrency limit to avoid overwhelming the LLM provider + const CONCURRENCY_LIMIT = 3; + const limiter = pLimit(CONCURRENCY_LIMIT); + + const resultsArray = await Promise.all( + systemsToProcess.map((system) => + limiter(() => + generateSignificantEventDefinitions( + { + definition: stream, + connectorId, + start, + end, + system, + sampleDocsSize, + systemPromptOverride: significantEventsPromptOverride, + }, + { + inferenceClient, + esClient, + logger: taskContext.logger.get('significant_events_generation'), + signal: runContext.abortController.signal, + } + ) + ) + ) + ); + + // Combine results from all parallel generations in a single pass + const combinedResults = + resultsArray.reduce( + (acc, result) => { + acc.queries.push(...result.queries); + acc.tokensUsed.prompt += result.tokensUsed.prompt; + acc.tokensUsed.completion += result.tokensUsed.completion; + return acc; + }, + { queries: [], tokensUsed: { prompt: 0, completion: 0 } } + ); + + taskContext.telemetry.trackSignificantEventsQueriesGenerated({ + count: combinedResults.queries.length, + systems_count: systems?.length ?? 0, + stream_name: stream.name, + stream_type: getStreamTypeFromDefinition(stream), + input_tokens_used: combinedResults.tokensUsed.prompt, + output_tokens_used: combinedResults.tokensUsed.completion, + }); + + await taskClient.update< + SignificantEventsQueriesGenerationTaskParams, + SignificantEventsQueriesGenerationResult + >({ + ..._task, + status: TaskStatus.Completed, + task: { + params: { + connectorId, + start, + end, + systems, + sampleDocsSize, + }, + payload: combinedResults, + }, + }); + } catch (error) { + // Get connector info for error enrichment + const connector = await inferenceClient.getConnectorById(connectorId); + + const errorMessage = isInferenceProviderError(error) + ? formatInferenceProviderError(error, connector) + : error.message; + + if ( + errorMessage.includes('ERR_CANCELED') || + errorMessage.includes('Request was aborted') + ) { + return; + } + + taskContext.logger.error( + `Task ${runContext.taskInstance.id} failed: ${errorMessage}` + ); + + await taskClient.update({ + ..._task, + status: TaskStatus.Failed, + task: { + params: { + connectorId, + start, + end, + systems, + sampleDocsSize, + }, + error: errorMessage, + }, + }); + } + }, + runContext, + taskContext + ), + }; + }, + }, + } satisfies TaskDefinitionRegistry; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts index f2026a38c99ed..29860ce0a3b85 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/client.ts @@ -8,11 +8,13 @@ import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server'; import type { StreamEndpointLatencyProps, + StreamsSignificantEventsQueriesGeneratedProps, StreamsStateErrorProps, StreamsSystemIdentificationIdentifiedProps, } from './types'; import { STREAMS_ENDPOINT_LATENCY_EVENT, + STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, STREAMS_STATE_ERROR_EVENT, STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, } from './constants'; @@ -65,4 +67,10 @@ export class EbtTelemetryClient { public trackSystemsIdentified(params: StreamsSystemIdentificationIdentifiedProps) { this.analytics.reportEvent(STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, params); } + + public trackSignificantEventsQueriesGenerated( + params: StreamsSignificantEventsQueriesGeneratedProps + ) { + this.analytics.reportEvent(STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, params); + } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts index 7308fc9d4d5e2..fe72cff881894 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/constants.ts @@ -9,9 +9,12 @@ const STREAMS_ENDPOINT_LATENCY_EVENT = 'streams-endpoint-latency'; const STREAMS_STATE_ERROR_EVENT = 'streams-state-error'; const STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE = 'streams-system-identification-identified'; +const STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE = + 'streams-significant-events-queries-generated'; export { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT, STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, + STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, }; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts index 4abc49ca71d9a..34f6bd5ae42fc 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/events.ts @@ -9,11 +9,13 @@ import { STREAMS_ENDPOINT_LATENCY_EVENT, STREAMS_STATE_ERROR_EVENT, STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, + STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, } from './constants'; import { streamsEndpointLatencySchema, streamsStateErrorSchema, streamsSystemIdentificationIdentifiedSchema, + streamsSignificantEventsQueriesGeneratedSchema, } from './schemas'; const streamsEndpointLatencyEventType = { @@ -31,8 +33,14 @@ const streamsSystemIdentificationIdentifiedEventType = { schema: streamsSystemIdentificationIdentifiedSchema, }; +const streamsSignificantEventsGeneratedEventType = { + eventType: STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, + schema: streamsSignificantEventsQueriesGeneratedSchema, +}; + export { streamsEndpointLatencyEventType, streamsStateErrorEventType, streamsSystemIdentificationIdentifiedEventType, + streamsSignificantEventsGeneratedEventType, }; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts index c00c6f4ed4091..2b88203e0712d 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/schemas.ts @@ -9,6 +9,7 @@ import type { RootSchema } from '@elastic/ebt/client'; import type { StreamEndpointLatencyProps, StreamsSystemIdentificationIdentifiedProps, + StreamsSignificantEventsQueriesGeneratedProps, StreamsStateErrorProps, } from './types'; @@ -99,8 +100,49 @@ const streamsSystemIdentificationIdentifiedSchema: RootSchema = + { + count: { + type: 'long', + _meta: { + description: 'The number of significant events queries generated', + }, + }, + systems_count: { + type: 'long', + _meta: { + description: 'The number of systems used to generate the queries', + }, + }, + input_tokens_used: { + type: 'long', + _meta: { + description: 'The number of input tokens used for the generation request', + }, + }, + output_tokens_used: { + type: 'long', + _meta: { + description: 'The number of output tokens used for the generation request', + }, + }, + stream_type: { + type: 'keyword', + _meta: { + description: 'The type of the stream: wired or classic', + }, + }, + stream_name: { + type: 'keyword', + _meta: { + description: 'The name of the Stream', + }, + }, + }; + export { streamsEndpointLatencySchema, streamsStateErrorSchema, streamsSystemIdentificationIdentifiedSchema, + streamsSignificantEventsQueriesGeneratedSchema, }; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts index 79d04f0a85187..1aa5acd96b2ae 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/service.ts @@ -10,6 +10,7 @@ import { streamsEndpointLatencyEventType, streamsStateErrorEventType, streamsSystemIdentificationIdentifiedEventType, + streamsSignificantEventsGeneratedEventType, } from './events'; import { EbtTelemetryClient } from './client'; @@ -23,6 +24,7 @@ export class EbtTelemetryService { this.analytics.registerEventType(streamsEndpointLatencyEventType); this.analytics.registerEventType(streamsStateErrorEventType); this.analytics.registerEventType(streamsSystemIdentificationIdentifiedEventType); + this.analytics.registerEventType(streamsSignificantEventsGeneratedEventType); } public getClient() { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts index 7313e39a38522..3df992c003c01 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/telemetry/ebt/types.ts @@ -30,8 +30,18 @@ interface StreamsSystemIdentificationIdentifiedProps { stream_type: StreamType; } +interface StreamsSignificantEventsQueriesGeneratedProps { + count: number; + systems_count: number; + input_tokens_used: number; + output_tokens_used: number; + stream_name: string; + stream_type: StreamType; +} + export { type StreamEndpointLatencyProps, type StreamsStateErrorProps, type StreamsSystemIdentificationIdentifiedProps, + type StreamsSignificantEventsQueriesGeneratedProps, }; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/index.ts b/x-pack/platform/plugins/shared/streams/server/routes/index.ts index 61bbd2523f362..6c2f4a619d536 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/index.ts @@ -18,6 +18,7 @@ import { internalCrudRoutes } from './internal/streams/crud/route'; import { internalManagementRoutes } from './internal/streams/management/route'; import { systemRoutes as internalSystemsRoutes } from './internal/streams/systems/route'; import { internalPromptsRoutes } from './internal/streams/prompts/route'; +import { internalSignificantEventsRoutes } from './internal/streams/significant_events/route'; import { significantEventsRoutes } from './streams/significant_events/route'; import { queryRoutes } from './queries/route'; import { failureStoreRoutes } from './internal/streams/failure_store/route'; @@ -37,6 +38,7 @@ export const streamsRouteRepository = { ...failureStoreRoutes, ...internalSystemsRoutes, ...internalPromptsRoutes, + ...internalSignificantEventsRoutes, ...internalIngestRoutes, ...connectorRoutes, ...internalAttachmentRoutes, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/significant_events/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/significant_events/route.ts new file mode 100644 index 0000000000000..573e8fe592391 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/significant_events/route.ts @@ -0,0 +1,225 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { conflict } from '@hapi/boom'; +import { + TaskStatus, + systemSchema, + type SignificantEventsQueriesGenerationResult, + type SignificantEventsQueriesGenerationTaskResult, +} from '@kbn/streams-schema'; +import { z } from '@kbn/zod'; +import { STREAMS_API_PRIVILEGES } from '../../../../../common/constants'; +import { AcknowledgingIncompleteError } from '../../../../lib/tasks/acknowledging_incomplete_error'; +import { CancellationInProgressError } from '../../../../lib/tasks/cancellation_in_progress_error'; +import { isStale } from '../../../../lib/tasks/is_stale'; +import { + SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE, + type SignificantEventsQueriesGenerationTaskParams, +} from '../../../../lib/tasks/task_definitions/significant_events_queries_generation'; +import { createServerRoute } from '../../../create_server_route'; +import { assertSignificantEventsAccess } from '../../../utils/assert_significant_events_access'; +import { resolveConnectorId } from '../../../utils/resolve_connector_id'; + +// Make sure strings are expected for input, but still converted to a +// Date, without breaking the OpenAPI generator +const dateFromString = z.string().transform((input) => new Date(input)); + +function getSignificantEventsQueriesGenerationTaskId(streamName: string) { + return `${SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE}_${streamName}`; +} + +const significantEventsQueriesGenerationStatusRoute = createServerRoute({ + endpoint: 'GET /internal/streams/{name}/significant_events/_status', + params: z.object({ + path: z.object({ name: z.string().describe('The name of the stream') }), + }), + options: { + access: 'internal', + summary: 'Check the status of significant events query generation', + description: + 'Significant events query generation happens as a background task, this endpoint allows the user to check the status of this task.', + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.read], + }, + }, + handler: async ({ + params, + request, + getScopedClients, + server, + }): Promise => { + const { streamsClient, licensing, uiSettingsClient, taskClient } = await getScopedClients({ + request, + }); + + await assertSignificantEventsAccess({ server, licensing, uiSettingsClient }); + await streamsClient.ensureStream(params.path.name); + + const { name } = params.path; + + const task = await taskClient.get< + SignificantEventsQueriesGenerationTaskParams, + SignificantEventsQueriesGenerationResult + >(getSignificantEventsQueriesGenerationTaskId(name)); + + if (task.status === TaskStatus.InProgress) { + return isStale(task.created_at) ? { status: TaskStatus.Stale } : { status: task.status }; + } else if (task.status === TaskStatus.Failed) { + return { + status: task.status, + error: task.task.error, + }; + } else if (task.status === TaskStatus.Completed || task.status === TaskStatus.Acknowledged) { + return { + status: task.status, + ...task.task.payload, + }; + } + + // Return status for remaining states: not_started, canceled, being_canceled + return { + status: task.status, + }; + }, +}); + +const significantEventsQueriesGenerationTaskRoute = createServerRoute({ + endpoint: 'POST /internal/streams/{name}/significant_events/_task', + params: z.object({ + path: z.object({ name: z.string().describe('The name of the stream') }), + body: z.discriminatedUnion('action', [ + z.object({ + action: z.literal('schedule').describe('Schedule a new generation task'), + from: dateFromString.describe('Start of the time range'), + to: dateFromString.describe('End of the time range'), + connectorId: z + .string() + .optional() + .describe( + 'Optional connector ID. If not provided, the default AI connector from settings will be used.' + ), + sampleDocsSize: z + .number() + .optional() + .describe( + 'Number of sample documents to use for generation from the current data of stream' + ), + systems: z.array(systemSchema).optional().describe('Optional array of systems'), + }), + z.object({ + action: z.literal('cancel').describe('Cancel an in-progress generation task'), + }), + z.object({ + action: z.literal('acknowledge').describe('Acknowledge a completed generation task'), + }), + ]), + }), + options: { + access: 'internal', + summary: 'Manage significant events query generation task', + description: + 'Manage the lifecycle of the background task that generates significant events queries based on the stream data.', + }, + security: { + authz: { + requiredPrivileges: [STREAMS_API_PRIVILEGES.manage], + }, + }, + handler: async ({ + params, + request, + getScopedClients, + server, + logger, + }): Promise => { + const { streamsClient, licensing, uiSettingsClient, taskClient } = await getScopedClients({ + request, + }); + + await assertSignificantEventsAccess({ server, licensing, uiSettingsClient }); + await streamsClient.ensureStream(params.path.name); + + const { name } = params.path; + const { action } = params.body; + + if (action === 'schedule') { + const { + from: start, + to: end, + connectorId: connectorIdParam, + sampleDocsSize, + systems, + } = params.body; + + try { + const connectorId = await resolveConnectorId({ + connectorId: connectorIdParam, + uiSettingsClient, + logger, + }); + await taskClient.schedule({ + task: { + type: SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE, + id: getSignificantEventsQueriesGenerationTaskId(name), + space: '*', + stream: name, + }, + params: { + connectorId, + start: start.getTime(), + end: end.getTime(), + systems, + sampleDocsSize, + }, + request, + }); + + return { + status: TaskStatus.InProgress, + }; + } catch (error) { + if (error instanceof CancellationInProgressError) { + throw conflict(error.message); + } + + throw error; + } + } else if (action === 'cancel') { + await taskClient.cancel(getSignificantEventsQueriesGenerationTaskId(name)); + + return { + status: TaskStatus.BeingCanceled, + }; + } + + // action === 'acknowledge' + try { + const task = await taskClient.acknowledge< + SignificantEventsQueriesGenerationTaskParams, + SignificantEventsQueriesGenerationResult + >(getSignificantEventsQueriesGenerationTaskId(name)); + + return { + status: TaskStatus.Acknowledged, + ...task.task.payload, + }; + } catch (error) { + if (error instanceof AcknowledgingIncompleteError) { + throw conflict(error.message); + } + + throw error; + } + }, +}); + +export const internalSignificantEventsRoutes = { + ...significantEventsQueriesGenerationStatusRoute, + ...significantEventsQueriesGenerationTaskRoute, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts index 0f65cd6cb32d0..9c4edd6887be4 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import { conditionSchema } from '@kbn/streamlang'; import { systemSchema, type SignificantEventsGenerateResponse, @@ -11,18 +12,17 @@ import { type SignificantEventsPreviewResponse, } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; -import { conditionSchema } from '@kbn/streamlang'; -import { from as fromRxjs, map, catchError } from 'rxjs'; -import { PromptsConfigService } from '../../../lib/saved_objects/significant_events/prompts_config_service'; -import { createConnectorSSEError } from '../../utils/create_connector_sse_error'; -import { resolveConnectorId } from '../../utils/resolve_connector_id'; +import { catchError, from as fromRxjs, map } from 'rxjs'; import { STREAMS_API_PRIVILEGES } from '../../../../common/constants'; +import { PromptsConfigService } from '../../../lib/saved_objects/significant_events/prompts_config_service'; import { generateSignificantEventDefinitions } from '../../../lib/significant_events/generate_significant_events'; import { previewSignificantEvents } from '../../../lib/significant_events/preview_significant_events'; import { readSignificantEventsFromAlertsIndices } from '../../../lib/significant_events/read_significant_events_from_alerts_indices'; import { createServerRoute } from '../../create_server_route'; import { assertSignificantEventsAccess } from '../../utils/assert_significant_events_access'; +import { createConnectorSSEError } from '../../utils/create_connector_sse_error'; import { getRequestAbortSignal } from '../../utils/get_request_abort_signal'; +import { resolveConnectorId } from '../../utils/resolve_connector_id'; // Make sure strings are expected for input, but still converted to a // Date, without breaking the OpenAPI generator diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx index 70e829daeff37..07129552ccf79 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/add_significant_event_flyout.tsx @@ -26,8 +26,7 @@ import { streamQuerySchema } from '@kbn/streams-schema'; import React, { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { css } from '@emotion/css'; import { v4 } from 'uuid'; -import { from, concatMap } from 'rxjs'; -import { getStreamTypeFromDefinition } from '../../../util/get_stream_type_from_definition'; +import useAsyncFn from 'react-use/lib/useAsyncFn'; import { useKibana } from '../../../hooks/use_kibana'; import { useSignificantEventsApi } from '../../../hooks/use_significant_events_api'; import type { AIFeatures } from '../../../hooks/use_ai_features'; @@ -36,11 +35,10 @@ import { ManualFlowForm } from './manual_flow_form/manual_flow_form'; import type { Flow, SaveData } from './types'; import { defaultQuery } from './utils/default_query'; import { StreamsAppSearchBar } from '../../streams_app_search_bar'; -import { ALL_DATA_OPTION } from '../feature_selector'; import { validateQuery } from './common/validate_query'; import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; +import { useTaskPolling } from '../../../hooks/use_task_polling'; import { SignificantEventsGenerationPanel } from '../generation_panel'; -import { useStreamDescriptionApi } from '../../stream_detail_features/stream_description/use_stream_description_api'; interface Props { refreshDefinition: () => void; @@ -71,8 +69,6 @@ export function AddSignificantEventFlyout({ }: Props) { const { euiTheme } = useEuiTheme(); const { - core: { notifications }, - services: { telemetryClient }, dependencies: { start: { data }, }, @@ -84,13 +80,8 @@ export function AddSignificantEventFlyout({ }); }, [data.dataViews, definition.stream.name]); - const { - onGenerateDescription: generateDescription, - onSaveDescription: saveDescription, - abort: abortDescription, - } = useStreamDescriptionApi({ definition, refreshDefinition, aiFeatures, silent: true }); - - const { generate, abort } = useSignificantEventsApi({ name: definition.stream.name }); + const { cancelGenerationTask, getGenerationTask, scheduleGenerationTask } = + useSignificantEventsApi({ name: definition.stream.name }); const isEditMode = !!query?.id; const [selectedFlow, setSelectedFlow] = useState( @@ -103,14 +94,68 @@ export function AddSignificantEventFlyout({ const [selectedFeatures, setSelectedFeatures] = useState(initialSelectedFeatures); - const [isGenerating, setIsGenerating] = useState(false); const [generatedQueries, setGeneratedQueries] = useState([]); + const [{ loading: isGettingTask, value: task }, getTask] = useAsyncFn(getGenerationTask); + const [{ loading: isSchedulingGenerationTask }, doScheduleGenerationTask] = + useAsyncFn(scheduleGenerationTask); + + useEffect(() => { + getTask(); + }, [getTask]); + + useTaskPolling(task, getGenerationTask, getTask); + + const isBeingCanceled = task?.status === 'being_canceled'; + const isGenerating = + task?.status === 'in_progress' || + isBeingCanceled || + isGettingTask || + isSchedulingGenerationTask; + + const prevTaskStatusRef = useRef(undefined); + + useEffect(() => { + const prevStatus = prevTaskStatusRef.current; + prevTaskStatusRef.current = task?.status; + + // Process completed when: + // - First time getting the task (prevStatus is undefined) + // - Transitioning from in_progress to completed + const isFirstLoad = prevStatus === undefined; + const isTransitionFromInProgress = prevStatus === 'in_progress'; + if ( + task?.status === 'completed' && + (isFirstLoad || isTransitionFromInProgress) && + !isGenerating + ) { + setGeneratedQueries( + task.queries + .filter((nextQuery) => { + const validation = validateQuery({ + title: nextQuery.title, + kql: { query: nextQuery.kql }, + }); + return validation.kql.isInvalid === false; + }) + .map((nextQuery) => ({ + id: v4(), + kql: { query: nextQuery.kql }, + title: nextQuery.title, + feature: nextQuery.feature, + severity_score: nextQuery.severity_score, + evidence: nextQuery.evidence, + })) + ); + } + }, [isGenerating, task]); const stopGeneration = useCallback(() => { - setIsGenerating(false); - abort(); - abortDescription(); - }, [abort, abortDescription]); + if (task?.status === 'in_progress') { + cancelGenerationTask().then(() => { + getTask(); + }); + } + }, [cancelGenerationTask, getTask, task?.status]); const parsedQueries = useMemo(() => { return streamQuerySchema.array().safeParse(queries); @@ -127,113 +172,26 @@ export function AddSignificantEventFlyout({ const generateQueries = useCallback( (featuresOverride?: System[]) => { - setSelectedFlow('ai'); - - let numberOfGeneratedQueries = 0; - let inputTokensUsed = 0; - let outputTokensUsed = 0; - const connector = aiFeatures?.genAiConnectors.selectedConnector; - if (!connector) { + const connectorId = aiFeatures?.genAiConnectors.selectedConnector; + if (!connectorId) { return; } - setIsGenerating(true); + setSelectedFlow('ai'); setGeneratedQueries([]); const effectiveFeatures = featuresOverride ?? selectedFeatures; - const maybeGenerateDescription = - !definition.stream.description && effectiveFeatures.length === 0 - ? generateDescription().then((description) => saveDescription(description)) - : Promise.resolve(); - - maybeGenerateDescription.then(() => { - const startTime = Date.now(); - - from(effectiveFeatures.length === 0 ? [ALL_DATA_OPTION.value] : effectiveFeatures) - .pipe( - concatMap((feature) => - generate(connector, feature.type === 'all_data' ? undefined : feature).pipe( - concatMap(({ queries: nextQueries, tokensUsed }) => { - numberOfGeneratedQueries += nextQueries.length; - - inputTokensUsed += tokensUsed.prompt; - outputTokensUsed += tokensUsed.completion; - - setGeneratedQueries((prev) => [ - ...prev, - ...nextQueries - .filter((nextQuery) => { - const validation = validateQuery({ - title: nextQuery.title, - kql: { query: nextQuery.kql }, - }); - - return validation.kql.isInvalid === false; - }) - .map((nextQuery) => ({ - id: v4(), - kql: { query: nextQuery.kql }, - title: nextQuery.title, - feature: nextQuery.feature, - severity_score: nextQuery.severity_score, - evidence: nextQuery.evidence, - })), - ]); - - return []; - }) - ) - ) - ) - .subscribe({ - error: (error) => { - setIsGenerating(false); - if (error.name === 'AbortError') { - return; - } - notifications.showErrorDialog({ - title: i18n.translate( - 'xpack.streams.addSignificantEventFlyout.generateErrorToastTitle', - { - defaultMessage: `Could not generate significant events queries`, - } - ), - error, - }); - }, - complete: () => { - notifications.toasts.addSuccess({ - title: i18n.translate( - 'xpack.streams.addSignificantEventFlyout.generateSuccessToastTitle', - { defaultMessage: `Generated significant events queries successfully` } - ), - }); - telemetryClient.trackSignificantEventsSuggestionsGenerate({ - duration_ms: Date.now() - startTime, - input_tokens_used: inputTokensUsed, - output_tokens_used: outputTokensUsed, - count: numberOfGeneratedQueries, - features_selected: selectedFeatures?.length ?? 0, - features_total: features.length, - stream_name: definition.stream.name, - stream_type: getStreamTypeFromDefinition(definition.stream), - }); - setIsGenerating(false); - }, - }); - }); + (async () => { + await doScheduleGenerationTask(connectorId, effectiveFeatures); + getTask(); + })(); }, [ aiFeatures?.genAiConnectors.selectedConnector, selectedFeatures, - generate, - notifications, - telemetryClient, - features.length, - definition, - generateDescription, - saveDescription, + doScheduleGenerationTask, + getTask, ] ); @@ -346,6 +304,7 @@ export function AddSignificantEventFlyout({ {flowRef.current === 'ai' && ( )} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/generated_flow_form.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/generated_flow_form.tsx index 7a0de5f2cc6cc..3da8c4f35f675 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/generated_flow_form.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/generated_flow_form.tsx @@ -5,16 +5,19 @@ * 2.0. */ +import { EuiCallOut } from '@elastic/eui'; import type { StreamQueryKql, System } from '@kbn/streams-schema'; import type { Streams } from '@kbn/streams-schema'; import React, { useEffect, useState } from 'react'; import type { DataView } from '@kbn/data-views-plugin/public'; +import { i18n } from '@kbn/i18n'; import { SignificantEventsGeneratedTable } from './significant_events_generated_table'; import { AiFlowEmptyState } from './empty_state'; import { AiFlowWaitingForGeneration } from './waiting_for_generation'; interface Props { isGenerating: boolean; + isBeingCanceled: boolean; generatedQueries: StreamQueryKql[]; onEditQuery: (query: StreamQueryKql) => void; stopGeneration: () => void; @@ -24,10 +27,13 @@ interface Props { setCanSave: (canSave: boolean) => void; features: Omit[]; dataViews: DataView[]; + taskStatus?: string; + taskError?: string; } export function GeneratedFlowForm({ isGenerating, + isBeingCanceled, generatedQueries, onEditQuery, stopGeneration, @@ -37,6 +43,8 @@ export function GeneratedFlowForm({ isSubmitting, features, dataViews, + taskStatus, + taskError, }: Props) { const [selectedQueries, setSelectedQueries] = useState([]); const [isEditingQueries, setIsEditingQueries] = useState(false); @@ -50,12 +58,46 @@ export function GeneratedFlowForm({ setCanSave(!isEditingQueries && selectedQueries.length > 0); }, [selectedQueries, isEditingQueries, setCanSave]); + if (!isGenerating && (taskStatus === 'failed' || taskStatus === 'stale')) { + const isFailed = taskStatus === 'failed'; + return ( + + {isFailed + ? taskError + : i18n.translate( + 'xpack.streams.streamDetailView.addSignificantEventFlyout.generationStaleDescription', + { defaultMessage: 'The generation task took too long and was marked as stale.' } + )} + + ); + } + if (!isGenerating && generatedQueries.length === 0) { return ; } if (isGenerating && generatedQueries.length === 0) { - return ; + return ( + + ); } if (!isGenerating && generatedQueries.length === 0) { @@ -63,7 +105,12 @@ export function GeneratedFlowForm({ } if (isGenerating && generatedQueries.length === 0) { - return ; + return ( + + ); } return ( @@ -80,7 +127,11 @@ export function GeneratedFlowForm({ dataViews={dataViews} /> {isGenerating && ( - + )} ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/waiting_for_generation.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/waiting_for_generation.tsx index 824557d05128c..1d1c12f4ffef8 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/waiting_for_generation.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/add_significant_event_flyout/generated_flow_form/waiting_for_generation.tsx @@ -13,9 +13,11 @@ import { useWaitingForAiMessage } from '../../../../hooks/use_waiting_for_ai_mes export function AiFlowWaitingForGeneration({ stopGeneration, hasInitialResults = false, + isBeingCanceled = false, }: { stopGeneration: () => void; hasInitialResults?: boolean; + isBeingCanceled?: boolean; }) { const label = useWaitingForAiMessage(hasInitialResults); @@ -30,18 +32,26 @@ export function AiFlowWaitingForGeneration({ - {label} - - {i18n.translate( - 'xpack.streams.aiFlowWaitingForGeneration.button.stopGenerationButtonLabel', - { defaultMessage: 'Stop' } - )} - + {isBeingCanceled + ? i18n.translate('xpack.streams.aiFlowWaitingForGeneration.cancelingGenerationLabel', { + defaultMessage: 'Canceling generation...', + }) + : label} + {!isBeingCanceled && ( + + + {i18n.translate( + 'xpack.streams.aiFlowWaitingForGeneration.button.stopGenerationButtonLabel', + { defaultMessage: 'Stop' } + )} + + + )} ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx index 47988bccd60de..65ebd18269be8 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/edit_significant_event_flyout.tsx @@ -51,7 +51,7 @@ export const EditSignificantEventFlyout = ({ services: { telemetryClient }, } = useKibana(); - const { upsertQuery, bulk } = useSignificantEventsApi({ + const { upsertQuery, bulk, acknowledgeGenerationTask } = useSignificantEventsApi({ name: definition.stream.name, }); @@ -109,7 +109,12 @@ export const EditSignificantEventFlyout = ({ index: query, })) ).then( - () => { + async () => { + // Acknowledge the task after successful save + await acknowledgeGenerationTask().catch(() => { + // Ignore errors - task acknowledgment is not critical + }); + notifications.toasts.addSuccess({ title: i18n.translate( 'xpack.streams.significantEvents.savedMultiple.successfullyToastTitle', diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx index 276e738c23b9b..77f7512063ca0 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx @@ -180,6 +180,7 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio size="s" color="primary" onClick={() => { + setSelectedFeatures([]); setIsEditFlyoutOpen(true); setQueryToEdit(undefined); }} @@ -250,7 +251,7 @@ export function StreamDetailSignificantEventsView({ definition, refreshDefinitio /> - {editFlyout(false)} + {editFlyout(selectedFeatures.length > 0)} ); } diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_significant_events_api.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_significant_events_api.ts index dd15d7f41ea64..02690382d3db0 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_significant_events_api.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_significant_events_api.ts @@ -6,8 +6,11 @@ */ import { useAbortController } from '@kbn/react-hooks'; -import type { StreamQueryKql, System } from '@kbn/streams-schema'; -import { type SignificantEventsGenerateResponse } from '@kbn/streams-schema'; +import type { + StreamQueryKql, + System, + SignificantEventsQueriesGenerationTaskResult, +} from '@kbn/streams-schema'; import { useKibana } from './use_kibana'; import { getLast24HoursTimeRange } from '../util/time_range'; @@ -26,8 +29,15 @@ interface SignificantEventsApi { upsertQuery: (query: StreamQueryKql) => Promise; removeQuery: (id: string) => Promise; bulk: (operations: SignificantEventsApiBulkOperation[]) => Promise; - generate: (connectorId: string, system?: System) => SignificantEventsGenerateResponse; abort: () => void; + getGenerationTask: () => Promise; + scheduleGenerationTask: ( + connectorId: string, + systems?: System[], + sampleDocsSize?: number + ) => Promise; + cancelGenerationTask: () => Promise; + acknowledgeGenerationTask: () => Promise; } export function useSignificantEventsApi({ name }: { name: string }): SignificantEventsApi { @@ -81,31 +91,72 @@ export function useSignificantEventsApi({ name }: { name: string }): Significant }, }); }, - generate: (connectorId: string, system?: System) => { + abort: () => { + abort(); + refresh(); + }, + getGenerationTask: async () => { + return streamsRepositoryClient.fetch( + 'GET /internal/streams/{name}/significant_events/_status', + { + signal, + params: { + path: { name }, + }, + } + ); + }, + scheduleGenerationTask: async ( + connectorId: string, + systems?: System[], + sampleDocsSize?: number + ) => { const { from, to } = getLast24HoursTimeRange(); - return streamsRepositoryClient.stream( - `POST /api/streams/{name}/significant_events/_generate 2023-10-31`, + return streamsRepositoryClient.fetch( + 'POST /internal/streams/{name}/significant_events/_task', { signal, params: { - path: { - name, - }, - query: { + path: { name }, + body: { + action: 'schedule' as const, connectorId, from, to, + sampleDocsSize, + systems, }, + }, + } + ); + }, + cancelGenerationTask: async () => { + return streamsRepositoryClient.fetch( + 'POST /internal/streams/{name}/significant_events/_task', + { + signal, + params: { + path: { name }, body: { - system, + action: 'cancel' as const, }, }, } ); }, - abort: () => { - abort(); - refresh(); + acknowledgeGenerationTask: async () => { + return streamsRepositoryClient.fetch( + 'POST /internal/streams/{name}/significant_events/_task', + { + signal, + params: { + path: { name }, + body: { + action: 'acknowledge' as const, + }, + }, + } + ); }, }; } diff --git a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts index 57b13d23e6694..43478eafc19e3 100644 --- a/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts +++ b/x-pack/platform/test/plugin_api_integration/test_suites/task_manager/check_registered_task_types.ts @@ -227,6 +227,7 @@ export default function ({ getService }: FtrProviderContext) { 'slo:bulk-delete-task', 'slo:temp-summary-cleanup-task', 'streams_feature_identification', + 'streams_significant_events_queries_generation', 'task_manager:delete_inactive_background_task_nodes', 'task_manager:invalidate_api_keys', 'task_manager:mark_removed_tasks_as_unrecognized',