Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ef2e0bf
feat(sigevents queries): add sig events queries task
cesco-f Jan 9, 2026
3cc5133
feat(split): split _task and _status endpoints
cesco-f Jan 12, 2026
6b49c6c
fix(cr): code review
cesco-f Jan 12, 2026
f7f5149
Changes from yarn openapi:bundle
kibanamachine Jan 12, 2026
66ae75c
Merge branch 'main' into sig-events-queries-task
cesco-f Jan 12, 2026
4140fed
Changes from make api-docs
kibanamachine Jan 12, 2026
08e73b4
Changes from node scripts/lint_ts_projects --fix
kibanamachine Jan 12, 2026
32a8351
Changes from node scripts/regenerate_moon_projects.js --update
kibanamachine Jan 12, 2026
93fb7d2
fix(generate endpoint): bring back generate endpoint
cesco-f Jan 12, 2026
de2229b
Changes from yarn openapi:bundle
kibanamachine Jan 12, 2026
b8810cb
Changes from make api-docs
kibanamachine Jan 12, 2026
26e4bb8
fix(internal): new endpoints are internal
cesco-f Jan 12, 2026
c8864ac
fix(description): remove description generation when generating sig e…
cesco-f Jan 12, 2026
8bebf19
refactor(task status): add task status enum
cesco-f Jan 12, 2026
61424af
refactor(plimit): use plimit for sigevents queries generation
cesco-f Jan 12, 2026
d54fcf3
refactor(internal endpoints): move internal endpoints
cesco-f Jan 12, 2026
3b1cb4b
Changes from yarn openapi:bundle
kibanamachine Jan 12, 2026
a4978e1
fix(tokens): remove total tokens used
cesco-f Jan 12, 2026
f2e01e2
refactor(task status): use task status
cesco-f Jan 12, 2026
8ca997f
Changes from make api-docs
kibanamachine Jan 12, 2026
60a5ef7
Changes from node scripts/lint_ts_projects --fix
kibanamachine Jan 12, 2026
ce06b40
Changes from node scripts/regenerate_moon_projects.js --update
kibanamachine Jan 12, 2026
0f4a9f5
fix(types): fix types
cesco-f Jan 12, 2026
69d71fc
Merge branch 'main' into sig-events-queries-task
cesco-f Jan 12, 2026
dc6b4c3
fix(test): fix test
cesco-f Jan 12, 2026
4f1d0ad
Merge branch 'main' into sig-events-queries-task
cesco-f Jan 12, 2026
2faf0b4
fix(privilege): manage privilege for _task endpoint
cesco-f Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ project:
sourceRoot: x-pack/platform/packages/shared/kbn-evals-suite-streams
dependsOn:
- '@kbn/evals'
- '@kbn/sse-utils-client'
- '@kbn/object-utils'
- '@kbn/grok-heuristics'
- '@kbn/tooling-log'
Expand All @@ -35,6 +34,7 @@ dependsOn:
- '@kbn/dev-cli-runner'
- '@kbn/core'
- '@kbn/dissect-heuristics'
- '@kbn/sse-utils-client'
tags:
- functional-tests
- package
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"exclude": ["target/**/*"],
"kbn_references": [
"@kbn/evals",
"@kbn/sse-utils-client",
"@kbn/object-utils",
"@kbn/grok-heuristics",
"@kbn/tooling-log",
Expand All @@ -24,5 +23,6 @@
"@kbn/dev-cli-runner",
"@kbn/core",
"@kbn/dissect-heuristics",
"@kbn/sse-utils-client",
]
}
2 changes: 2 additions & 0 deletions x-pack/platform/packages/shared/kbn-streams-schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ export type {
SignificantEventsPreviewResponse,
SignificantEventsGenerateResponse,
GeneratedSignificantEventQuery,
SignificantEventsQueriesGenerationResult,
SignificantEventsQueriesGenerationTaskResult,
} from './src/api/significant_events';

export { emptyAssets } from './src/helpers/empty_assets';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type { ServerSentEventBase } from '@kbn/sse-utils';
import type { Condition } from '@kbn/streamlang';
import type { ChatCompletionTokenCount } from '@kbn/inference-common';
import type { StreamQueryKql } from '../../queries';
import type { TaskStatus } from '../../tasks/types';

/**
* SignificantEvents Get Response
Expand Down Expand Up @@ -71,10 +72,34 @@ type SignificantEventsGenerateResponse = Observable<
>
>;

interface SignificantEventsQueriesGenerationResult {
queries: GeneratedSignificantEventQuery[];
tokensUsed: Pick<ChatCompletionTokenCount, 'prompt' | 'completion'>;
}

type SignificantEventsQueriesGenerationTaskResult =
| {
status:
| TaskStatus.NotStarted
| TaskStatus.InProgress
| TaskStatus.Stale
| TaskStatus.BeingCanceled
| TaskStatus.Canceled;
}
| {
status: TaskStatus.Failed;
error: string;
}
| ({
status: TaskStatus.Completed | TaskStatus.Acknowledged;
} & SignificantEventsQueriesGenerationResult);

export type {
SignificantEventsResponse,
SignificantEventsGetResponse,
SignificantEventsPreviewResponse,
GeneratedSignificantEventQuery,
SignificantEventsGenerateResponse,
SignificantEventsQueriesGenerationResult,
SignificantEventsQueriesGenerationTaskResult,
};
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export function cancellableTask(
taskContext.logger.trace(
`Cancellable task check loop for task ${runContext.taskInstance.id}: status is ${task.status}`
);
if (task.status === 'being_canceled') {
if (task.status === TaskStatus.BeingCanceled) {
runContext.abortController.abort();
await taskClient.update({
...task,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { Logger } from '@kbn/core/server';
import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server';
import type { GetScopedClients } from '../../../routes/types';
import { createStreamsSystemIdentificationTask } from './system_identification';
import { createStreamsSignificantEventsQueriesGenerationTask } from './significant_events_queries_generation';
import type { EbtTelemetryClient } from '../../telemetry';

export interface TaskContext {
Expand All @@ -20,6 +21,7 @@ export interface TaskContext {
export function createTaskDefinitions(taskContext: TaskContext) {
return {
...createStreamsSystemIdentificationTask(taskContext),
...createStreamsSignificantEventsQueriesGenerationTask(taskContext),
} satisfies TaskDefinitionRegistry;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { TaskDefinitionRegistry } from '@kbn/task-manager-plugin/server';
import { isInferenceProviderError } from '@kbn/inference-common';
import {
TaskStatus,
getStreamTypeFromDefinition,
type SignificantEventsQueriesGenerationResult,
type System,
} from '@kbn/streams-schema';
import pLimit from 'p-limit';
import { formatInferenceProviderError } from '../../../routes/utils/create_connector_sse_error';
import type { TaskContext } from '.';
import type { TaskParams } from '../types';
import { PromptsConfigService } from '../../saved_objects/significant_events/prompts_config_service';
import { cancellableTask } from '../cancellable_task';
import { generateSignificantEventDefinitions } from '../../significant_events/generate_significant_events';

export interface SignificantEventsQueriesGenerationTaskParams {
connectorId: string;
start: number;
end: number;
systems?: System[];
sampleDocsSize?: number;
}

export const SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE =
'streams_significant_events_queries_generation';

export function createStreamsSignificantEventsQueriesGenerationTask(taskContext: TaskContext) {
return {
[SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE]: {
createTaskRunner: (runContext) => {
return {
run: cancellableTask(
async () => {
if (!runContext.fakeRequest) {
throw new Error('Request is required to run this task');
}

const { connectorId, start, end, systems, sampleDocsSize, _task } = runContext
.taskInstance.params as TaskParams<SignificantEventsQueriesGenerationTaskParams>;
const { stream: name } = _task;

const { taskClient, scopedClusterClient, streamsClient, inferenceClient, soClient } =
await taskContext.getScopedClients({
request: runContext.fakeRequest,
});

try {
const stream = await streamsClient.getStream(name);

const esClient = scopedClusterClient.asCurrentUser;

const promptsConfigService = new PromptsConfigService({
soClient,
logger: taskContext.logger,
});

const { significantEventsPromptOverride } = await promptsConfigService.getPrompt();

// If no systems are passed, generate for all data
// If systems are passed, generate for each system with concurrency limit
const systemsToProcess: Array<System | undefined> =
systems && systems.length > 0 ? systems : [undefined];

// Process systems with concurrency limit to avoid overwhelming the LLM provider
const CONCURRENCY_LIMIT = 3;
const limiter = pLimit(CONCURRENCY_LIMIT);

const resultsArray = await Promise.all(
systemsToProcess.map((system) =>
limiter(() =>
generateSignificantEventDefinitions(
{
definition: stream,
connectorId,
start,
end,
system,
sampleDocsSize,
systemPromptOverride: significantEventsPromptOverride,
},
{
inferenceClient,
esClient,
logger: taskContext.logger.get('significant_events_generation'),
signal: runContext.abortController.signal,
}
)
)
)
);

// Combine results from all parallel generations in a single pass
const combinedResults =
resultsArray.reduce<SignificantEventsQueriesGenerationResult>(
(acc, result) => {
acc.queries.push(...result.queries);
acc.tokensUsed.prompt += result.tokensUsed.prompt;
acc.tokensUsed.completion += result.tokensUsed.completion;
return acc;
},
{ queries: [], tokensUsed: { prompt: 0, completion: 0 } }
);

taskContext.telemetry.trackSignificantEventsQueriesGenerated({
count: combinedResults.queries.length,
systems_count: systems?.length ?? 0,
stream_name: stream.name,
stream_type: getStreamTypeFromDefinition(stream),
input_tokens_used: combinedResults.tokensUsed.prompt,
output_tokens_used: combinedResults.tokensUsed.completion,
});

await taskClient.update<
SignificantEventsQueriesGenerationTaskParams,
SignificantEventsQueriesGenerationResult
>({
..._task,
status: TaskStatus.Completed,
task: {
params: {
connectorId,
start,
end,
systems,
sampleDocsSize,
},
payload: combinedResults,
},
});
} catch (error) {
// Get connector info for error enrichment
const connector = await inferenceClient.getConnectorById(connectorId);

const errorMessage = isInferenceProviderError(error)
? formatInferenceProviderError(error, connector)
: error.message;

if (
errorMessage.includes('ERR_CANCELED') ||
errorMessage.includes('Request was aborted')
) {
return;
}

taskContext.logger.error(
`Task ${runContext.taskInstance.id} failed: ${errorMessage}`
);

await taskClient.update<SignificantEventsQueriesGenerationTaskParams>({
..._task,
status: TaskStatus.Failed,
task: {
params: {
connectorId,
start,
end,
systems,
sampleDocsSize,
},
error: errorMessage,
},
});
}
},
runContext,
taskContext
),
};
},
},
} satisfies TaskDefinitionRegistry;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@
import type { AnalyticsServiceSetup } from '@kbn/core-analytics-server';
import type {
StreamEndpointLatencyProps,
StreamsSignificantEventsQueriesGeneratedProps,
StreamsStateErrorProps,
StreamsSystemIdentificationIdentifiedProps,
} from './types';
import {
STREAMS_ENDPOINT_LATENCY_EVENT,
STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE,
STREAMS_STATE_ERROR_EVENT,
STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE,
} from './constants';
Expand Down Expand Up @@ -65,4 +67,10 @@ export class EbtTelemetryClient {
public trackSystemsIdentified(params: StreamsSystemIdentificationIdentifiedProps) {
this.analytics.reportEvent(STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE, params);
}

public trackSignificantEventsQueriesGenerated(
params: StreamsSignificantEventsQueriesGeneratedProps
) {
this.analytics.reportEvent(STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ const STREAMS_ENDPOINT_LATENCY_EVENT = 'streams-endpoint-latency';
const STREAMS_STATE_ERROR_EVENT = 'streams-state-error';
const STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE =
'streams-system-identification-identified';
const STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE =
'streams-significant-events-queries-generated';

export {
STREAMS_ENDPOINT_LATENCY_EVENT,
STREAMS_STATE_ERROR_EVENT,
STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE,
STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE,
};
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import {
STREAMS_ENDPOINT_LATENCY_EVENT,
STREAMS_STATE_ERROR_EVENT,
STREAMS_SYSTEM_IDENTIFICATION_IDENTIFIED_EVENT_TYPE,
STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE,
} from './constants';
import {
streamsEndpointLatencySchema,
streamsStateErrorSchema,
streamsSystemIdentificationIdentifiedSchema,
streamsSignificantEventsQueriesGeneratedSchema,
} from './schemas';

const streamsEndpointLatencyEventType = {
Expand All @@ -31,8 +33,14 @@ const streamsSystemIdentificationIdentifiedEventType = {
schema: streamsSystemIdentificationIdentifiedSchema,
};

const streamsSignificantEventsGeneratedEventType = {
eventType: STREAMS_SIGNIFICANT_EVENTS_QUERIES_GENERATED_EVENT_TYPE,
schema: streamsSignificantEventsQueriesGeneratedSchema,
};

export {
streamsEndpointLatencyEventType,
streamsStateErrorEventType,
streamsSystemIdentificationIdentifiedEventType,
streamsSignificantEventsGeneratedEventType,
};
Loading