Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface InsightsDiscoveryTaskResult {
export interface InsightsDiscoveryTaskParams {
/** When provided, only generate insights for these stream names. Otherwise all streams are used. */
streamNames?: string[];
connectorId?: string;
}

export const STREAMS_INSIGHTS_DISCOVERY_TASK_TYPE = 'streams_insights_discovery';
Expand All @@ -47,8 +48,11 @@ export function createStreamsInsightsDiscoveryTask(taskContext: TaskContext) {
}
const { fakeRequest } = runContext;

const { streamNames, _task } = runContext.taskInstance
.params as TaskParams<InsightsDiscoveryTaskParams>;
const {
streamNames,
connectorId: connectorIdOverride,
_task,
} = runContext.taskInstance.params as TaskParams<InsightsDiscoveryTaskParams>;

const {
taskClient,
Expand All @@ -62,12 +66,14 @@ export function createStreamsInsightsDiscoveryTask(taskContext: TaskContext) {
});

const taskLogger = taskContext.logger.get('insights_discovery');
const connectorId = await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID,
featureName: 'discovery',
request: fakeRequest,
});
const connectorId =
connectorIdOverride ??
(await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID,
featureName: 'discovery',
request: fakeRequest,
}));
taskLogger.debug(`Using connector ${connectorId} for discovery`);
const boundInferenceClient = inferenceClient.bindTo({ connectorId });

Expand Down Expand Up @@ -116,7 +122,7 @@ export function createStreamsInsightsDiscoveryTask(taskContext: TaskContext) {

await taskClient.complete<InsightsDiscoveryTaskParams, InsightsDiscoveryTaskResult>(
_task,
{ streamNames },
{ streamNames, connectorId: connectorIdOverride },
{ insights, tokensUsed: result.tokens_used }
);
} catch (error) {
Expand Down Expand Up @@ -144,7 +150,7 @@ export function createStreamsInsightsDiscoveryTask(taskContext: TaskContext) {

await taskClient.fail<InsightsDiscoveryTaskParams>(
_task,
{ streamNames },
{ streamNames, connectorId: connectorIdOverride },
errorMessage
);
return getDeleteTaskRunResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface SignificantEventsQueriesGenerationTaskParams {
end: number;
sampleDocsSize?: number;
streamName: string;
connectorId?: string;
}

export const SIGNIFICANT_EVENTS_QUERIES_GENERATION_TASK_TYPE =
Expand All @@ -49,7 +50,14 @@ export function createStreamsSignificantEventsQueriesGenerationTask(taskContext:
}
const { fakeRequest } = runContext;

const { start, end, sampleDocsSize, streamName, _task } = runContext.taskInstance
const {
start,
end,
sampleDocsSize,
streamName,
connectorId: connectorIdOverride,
_task,
} = runContext.taskInstance
.params as TaskParams<SignificantEventsQueriesGenerationTaskParams>;

const {
Expand All @@ -64,12 +72,14 @@ export function createStreamsSignificantEventsQueriesGenerationTask(taskContext:
});

const taskLogger = taskContext.logger.get('significant_events_queries_generation');
const connectorId = await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID,
featureName: 'query generation',
request: fakeRequest,
});
const connectorId =
connectorIdOverride ??
(await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID,
featureName: 'query generation',
request: fakeRequest,
}));
taskLogger.debug(`Using connector ${connectorId} for rule generation`);

try {
Expand Down Expand Up @@ -113,7 +123,11 @@ export function createStreamsSignificantEventsQueriesGenerationTask(taskContext:
await taskClient.complete<
SignificantEventsQueriesGenerationTaskParams,
SignificantEventsQueriesGenerationResult
>(_task, { start, end, sampleDocsSize, streamName }, result);
>(
_task,
{ start, end, sampleDocsSize, streamName, connectorId: connectorIdOverride },
result
);
} catch (error) {
if (isDefinitionNotFoundError(error)) {
taskContext.logger.debug(
Expand Down Expand Up @@ -146,7 +160,13 @@ export function createStreamsSignificantEventsQueriesGenerationTask(taskContext:

await taskClient.fail<SignificantEventsQueriesGenerationTaskParams>(
_task,
{ start, end, sampleDocsSize, streamName },
{
start,
end,
sampleDocsSize,
streamName,
connectorId: connectorIdOverride,
},
errorMessage
);

Expand Down
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CoenWarmer should this file be owned by the onboarding team?

Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export interface FeaturesIdentificationTaskParams {
start: number;
end: number;
streamName: string;
connectorId?: string;
}

export const FEATURES_IDENTIFICATION_TASK_TYPE = 'streams_features_identification';
Expand All @@ -355,8 +356,13 @@ export function createStreamsFeaturesIdentificationTask(taskContext: TaskContext
}
const { fakeRequest } = runContext;

const { start, end, streamName, _task } = runContext.taskInstance
.params as TaskParams<FeaturesIdentificationTaskParams>;
const {
start,
end,
streamName,
connectorId: connectorIdOverride,
_task,
} = runContext.taskInstance.params as TaskParams<FeaturesIdentificationTaskParams>;

const runId = uuid();
const trackEmptyTelemetry = (state: 'canceled' | 'failure') => {
Expand Down Expand Up @@ -395,12 +401,14 @@ export function createStreamsFeaturesIdentificationTask(taskContext: TaskContext
});

const taskLogger = taskContext.logger.get('features_identification', streamName);
const connectorId = await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID,
featureName: 'knowledge indicator extraction',
request: fakeRequest,
});
const connectorId =
connectorIdOverride ??
(await resolveConnectorForFeature({
searchInferenceEndpoints: taskContext.server.searchInferenceEndpoints,
featureId: STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID,
featureName: 'knowledge indicator extraction',
request: fakeRequest,
}));
taskLogger.debug(`Using connector ${connectorId} for knowledge indicator extraction`);

let hasTrackedIteration = false;
Expand Down Expand Up @@ -509,7 +517,7 @@ export function createStreamsFeaturesIdentificationTask(taskContext: TaskContext

await taskClient.complete<FeaturesIdentificationTaskParams, IdentifyFeaturesResult>(
_task,
{ start, end, streamName },
{ start, end, streamName, connectorId: connectorIdOverride },
{
features: allFeatures,
durationMs,
Expand Down Expand Up @@ -566,7 +574,7 @@ export function createStreamsFeaturesIdentificationTask(taskContext: TaskContext

await taskClient.fail<FeaturesIdentificationTaskParams, IdentifyFeaturesResult>(
_task,
{ start, end, streamName },
{ start, end, streamName, connectorId: connectorIdOverride },
errorMessage,
{
features: [],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ export interface OnboardingTaskParams {
to: number;
steps: OnboardingStep[];
saveQueries: boolean;
connectors?: {
features?: string;
queries?: string;
};
}

export const STREAMS_ONBOARDING_TASK_TYPE = 'streams_onboarding';
Expand All @@ -51,6 +55,31 @@ export function getOnboardingTaskId(streamName: string, saveQueries: boolean = t
return saveQueries ? base : `${base}_no_save_queries`;
}

const FEATURES_IDENTIFICATION_RECENCY_MS = 12 * 60 * 60 * 1000; // 12 hours

async function areFeaturesUpToDate({
taskClient,
featuresTaskId,
}: {
taskClient: TaskClient<StreamsTaskType>;
featuresTaskId: string;
}) {
const featuresTask = await taskClient.get<
FeaturesIdentificationTaskParams,
IdentifyFeaturesResult
>(featuresTaskId);

if (featuresTask.status !== TaskStatus.Completed) {
return false;
}

return Boolean(
featuresTask.last_completed_at &&
Date.now() - new Date(featuresTask.last_completed_at).getTime() <
FEATURES_IDENTIFICATION_RECENCY_MS
);
}

export function createStreamsOnboardingTask(taskContext: TaskContext) {
return {
[STREAMS_ONBOARDING_TASK_TYPE]: {
Expand All @@ -64,8 +93,8 @@ export function createStreamsOnboardingTask(taskContext: TaskContext) {
}
const { fakeRequest } = runContext;

const { streamName, from, to, steps, saveQueries, _task } = runContext.taskInstance
.params as TaskParams<OnboardingTaskParams>;
const { streamName, from, to, steps, saveQueries, connectors, _task } = runContext
.taskInstance.params as TaskParams<OnboardingTaskParams>;

const { taskClient, queryClient, streamsClient } = await taskContext.getScopedClients(
{
Expand All @@ -83,21 +112,34 @@ export function createStreamsOnboardingTask(taskContext: TaskContext) {
switch (step) {
case OnboardingStep.FeaturesIdentification: {
const featuresTaskId = getFeaturesIdentificationTaskId(streamName);

await scheduleFeaturesIdentificationTask(
{
start: from,
end: to,
streamName,
},
taskClient,
fakeRequest
);

featuresTaskResult = await waitForSubtask<
FeaturesIdentificationTaskParams,
IdentifyFeaturesResult
>(featuresTaskId, runContext.taskInstance.id, taskClient);
const isFeaturesOnlyStep =
steps.length === 1 && steps[0] === OnboardingStep.FeaturesIdentification;

if (
!isFeaturesOnlyStep &&
(await areFeaturesUpToDate({ taskClient, featuresTaskId }))
) {
featuresTaskResult = await taskClient.getStatus<
FeaturesIdentificationTaskParams,
IdentifyFeaturesResult
>(featuresTaskId);
} else {
await scheduleFeaturesIdentificationTask(
{
start: from,
end: to,
streamName,
connectorId: connectors?.features,
},
taskClient,
fakeRequest
);

featuresTaskResult = await waitForSubtask<
FeaturesIdentificationTaskParams,
IdentifyFeaturesResult
>(featuresTaskId, runContext.taskInstance.id, taskClient);
}

if (featuresTaskResult.status !== TaskStatus.Completed) {
return;
Expand All @@ -111,6 +153,7 @@ export function createStreamsOnboardingTask(taskContext: TaskContext) {
start: from,
end: to,
streamName,
connectorId: connectors?.queries,
},
taskClient,
fakeRequest
Expand Down Expand Up @@ -140,7 +183,14 @@ export function createStreamsOnboardingTask(taskContext: TaskContext) {

await taskClient.complete<OnboardingTaskParams, OnboardingResult>(
_task,
{ streamName, from, to, steps, saveQueries },
{
streamName,
from,
to,
steps,
saveQueries,
connectors,
},
{ featuresTaskResult, queriesTaskResult }
);
} catch (error) {
Expand Down Expand Up @@ -168,6 +218,7 @@ export function createStreamsOnboardingTask(taskContext: TaskContext) {
to,
steps,
saveQueries,
connectors,
},
errorMessage
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ const insightsTaskRoute = createServerRoute({
.array(z.string())
.describe('List of stream names to generate insights for.')
.optional(),
connectorId: z
.string()
.optional()
.describe(
'Optional connector ID override. When omitted the server resolves the connector from the inference feature registry.'
),
}),
}),
handler: async ({ params, request, getScopedClients, server }): Promise<InsightsTaskResult> => {
Expand All @@ -67,6 +73,7 @@ const insightsTaskRoute = createServerRoute({
taskId: STREAMS_INSIGHTS_DISCOVERY_TASK_TYPE,
params: {
streamNames: body.streamNames,
connectorId: body.connectorId,
},
request,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ export const onboardingTaskRoute = createServerRoute({
.describe(
'Optional list of steps to perform as part of stream onboarding in the specified sequence. By default it will execute all steps.'
),
connectors: z
.object({
features: z.string().optional().describe('Connector ID for features identification.'),
queries: z.string().optional().describe('Connector ID for queries generation.'),
})
.optional()
.describe(
'Optional per-step connector overrides. When omitted the server resolves connectors from the inference feature registry.'
),
}),
}),
handler: async ({ params, request, getScopedClients, server }): Promise<OnboardingTaskResult> => {
Expand Down Expand Up @@ -85,6 +94,7 @@ export const onboardingTaskRoute = createServerRoute({
to: body.to,
steps: body.steps,
saveQueries,
connectors: body.connectors,
},
request,
},
Expand Down
1 change: 1 addition & 0 deletions x-pack/platform/plugins/shared/streams_app/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependsOn:
- '@kbn/react-query'
- '@kbn/timerange'
- '@kbn/inference-common'
- '@kbn/inference-connectors'
- '@kbn/esql'
- '@kbn/inference-endpoint-ui-common'
- '@kbn/stack-connectors-plugin'
Expand Down
Loading
Loading