Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions x-pack/platform/packages/shared/kbn-streams-schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,4 +271,7 @@ export {
STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID,
STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID,
STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID,
STREAMS_INFERENCE_PARENT_FEATURE_ID,
STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID,
STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID,
} from './src/inference_feature_ids';
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,14 @@ export const STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID =
/** Discovery and significant event generation. */
export const STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID =
'streams_sig_events_discovery' as const;

/** Parent feature for Streams (Inference Feature Registry). */
export const STREAMS_INFERENCE_PARENT_FEATURE_ID = 'streams' as const;

/** Partitioning suggestions. */
export const STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID =
'streams_partitioning_suggestions' as const;

/** Processing suggestions (pipelines, grok/dissect processors). */
export const STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID =
'streams_processing_suggestions' as const;
5 changes: 5 additions & 0 deletions x-pack/platform/plugins/shared/streams/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { baseFields } from './lib/streams/component_templates/logs_layer';
import { ecsBaseFields } from './lib/streams/component_templates/logs_ecs_layer';
import { registerStreamsAgentBuilder } from './agent_builder/register';
import { registerSignificantEventsInferenceFeatures } from './register_significant_events_inference_features';
import { registerSuggestionsInferenceFeatures } from './register_suggestions_inference_features';
import { PatternExtractionService } from './lib/pattern_extraction/pattern_extraction_service';
import { createStreamsSettingsStorageClient } from './lib/streams/storage/streams_settings_storage_client';
import {
Expand Down Expand Up @@ -133,6 +134,10 @@ export class StreamsPlugin
plugins.searchInferenceEndpoints,
this.logger.get('inference-features')
);
registerSuggestionsInferenceFeatures(
plugins.searchInferenceEndpoints,
this.logger.get('inference-features')
);

const inferenceResolver = createInferenceResolver(this.logger);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { Logger } from '@kbn/core/server';
import type { SearchInferenceEndpointsPluginSetup } from '@kbn/search-inference-endpoints/server';
import { i18n } from '@kbn/i18n';
import {
STREAMS_INFERENCE_PARENT_FEATURE_ID,
STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID,
STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID,
} from '@kbn/streams-schema';

/**
* Registers Streams parent + child suggestion features with the Inference Feature Registry.
* No-op when the searchInferenceEndpoints plugin is unavailable.
*/
export function registerSuggestionsInferenceFeatures(
searchInferenceEndpoints: SearchInferenceEndpointsPluginSetup | undefined,
logger: Logger
): void {
if (!searchInferenceEndpoints) {
return;
}

const { register } = searchInferenceEndpoints.features;

const parentResult = register({
featureId: STREAMS_INFERENCE_PARENT_FEATURE_ID,
featureName: i18n.translate('xpack.streams.inferenceFeature.suggestionsParentName', {
defaultMessage: 'Streams',
}),
featureDescription: i18n.translate(
'xpack.streams.inferenceFeature.suggestionsParentDescription',
{
defaultMessage: 'AI models used for Streams suggestions.',
}
),
taskType: 'chat_completion',
recommendedEndpoints: [],
});
if (parentResult.ok) {
logger.debug(`Registered parent inference feature "${STREAMS_INFERENCE_PARENT_FEATURE_ID}"`);
} else {
logger.warn(
`Failed to register inference feature "${STREAMS_INFERENCE_PARENT_FEATURE_ID}": ${parentResult.error}`
);
}

const children: Array<{
featureId: string;
featureName: string;
featureDescription: string;
recommendedEndpoints: string[];
}> = [
{
featureId: STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID,
featureName: i18n.translate('xpack.streams.inferenceFeature.partitionSuggestionsName', {
defaultMessage: 'Partitioning suggestions',
}),
featureDescription: i18n.translate(
'xpack.streams.inferenceFeature.partitionSuggestionsDescription',
{
defaultMessage: 'Model used to suggest partitions.',
}
),
recommendedEndpoints: [],
},
{
featureId: STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID,
featureName: i18n.translate('xpack.streams.inferenceFeature.processingSuggestionsName', {
defaultMessage: 'Processing suggestions',
}),
featureDescription: i18n.translate(
'xpack.streams.inferenceFeature.processingSuggestionsDescription',
{
defaultMessage: 'Model used to suggest processing pipelines and grok/dissect processors.',
}
),
recommendedEndpoints: [],
},
];

for (const child of children) {
const childResult = register({
featureId: child.featureId,
parentFeatureId: STREAMS_INFERENCE_PARENT_FEATURE_ID,
featureName: child.featureName,
featureDescription: child.featureDescription,
taskType: 'chat_completion',
recommendedEndpoints: child.recommendedEndpoints,
});
if (childResult.ok) {
logger.debug(`Registered child inference feature "${child.featureId}"`);
} else {
logger.warn(
`Failed to register inference feature "${child.featureId}": ${childResult.error}`
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,6 @@ import { z } from '@kbn/zod/v4';
import { STREAMS_API_PRIVILEGES } from '../../../../common/constants';
import { createServerRoute } from '../../create_server_route';

export const getConnectorsRoute = createServerRoute({
endpoint: 'GET /internal/streams/connectors',
options: {
access: 'internal',
summary: 'Get GenAI connectors',
description: 'Fetches all available GenAI connectors for AI features',
},
security: {
authz: {
requiredPrivileges: [STREAMS_API_PRIVILEGES.read],
},
},
handler: async ({ request, server }) => {
const connectors = await server.inference.getConnectorList(request);

return { connectors };
},
});

export const getConnectorByIdRoute = createServerRoute({
endpoint: 'GET /internal/streams/connectors/{connectorId}',
options: {
Expand All @@ -49,6 +30,5 @@ export const getConnectorByIdRoute = createServerRoute({
});

export const connectorRoutes = {
...getConnectorsRoute,
...getConnectorByIdRoute,
};
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ export const suggestPartitionsRoute = createServerRoute({
request,
});

const { connector_id: connectorId } = params.body;

const stream = await streamsClient.getStream(params.path.name);
if (!Streams.WiredStream.Definition.is(stream)) {
throw new StatusError('Partitioning suggestions are only available for wired streams', 400);
}

const partitionsPromise = partitionStream({
definition: stream,
inferenceClient: inferenceClient.bindTo({ connectorId: params.body.connector_id }),
inferenceClient: inferenceClient.bindTo({ connectorId }),
esClient: scopedClusterClient.asCurrentUser,
logger,
start: params.body.start,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({
telemetry,
}): Promise<SuggestProcessingPipelineResponse> => {
const log = logger.get('suggestProcessingPipeline');
log.debug(
`Request received (stream=${params.path.name} connectorId=${params.body.connector_id})`
);
const { connector_id: connectorId } = params.body;

// Wrap entire logic in Observable so errors can be sent as SSE events
return from(
Expand All @@ -108,6 +106,8 @@ export const suggestProcessingPipelineRoute = createServerRoute({
const { inferenceClient, scopedClusterClient, streamsClient, fieldsMetadataClient } =
await getScopedClients({ request });

log.debug(`Request received (stream=${params.path.name} connectorId=${connectorId})`);

const stream = await streamsClient.getStream(params.path.name);
if (!Streams.ingest.all.Definition.is(stream)) {
throw new StatusError(
Expand All @@ -134,15 +134,15 @@ export const suggestProcessingPipelineRoute = createServerRoute({
> = [];

log.debug(
`Scheduling parallel grok + dissect extraction (stream=${stream.name} messages=${messages.length} fieldName=${fieldName} connectorId=${params.body.connector_id})`
`Scheduling parallel grok + dissect extraction (stream=${stream.name} messages=${messages.length} fieldName=${fieldName} connectorId=${connectorId})`
);

candidatePromises.push(
processGrokPatterns({
messages,
fieldName,
streamName: stream.name,
connectorId: params.body.connector_id,
connectorId,
documents: params.body.documents,
patternExtractionService,
inferenceClient,
Expand All @@ -159,7 +159,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({
messages,
fieldName,
streamName: stream.name,
connectorId: params.body.connector_id,
connectorId,
documents: params.body.documents,
patternExtractionService,
inferenceClient,
Expand All @@ -185,13 +185,13 @@ export const suggestProcessingPipelineRoute = createServerRoute({
const { reason } = result;
if (isNoLLMSuggestionsError(reason)) {
log.debug(
`No LLM suggestions available (stream=${stream.name} connectorId=${params.body.connector_id})`
`No LLM suggestions available (stream=${stream.name} connectorId=${connectorId})`
);
} else {
const meta = formatInferenceErrorMeta(reason);
log.error(
`Candidate failed (stream=${stream.name}` +
` connectorId=${params.body.connector_id}${meta}): ${getErrorMessage(reason)}`
` connectorId=${connectorId}${meta}): ${getErrorMessage(reason)}`
);
}
}
Expand All @@ -216,7 +216,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({

const result = await suggestProcessingPipeline({
definition: stream,
inferenceClient: inferenceClient.bindTo({ connectorId: params.body.connector_id }),
inferenceClient: inferenceClient.bindTo({ connectorId }),
parsingProcessor,
maxSteps,
signal: abortController.signal,
Expand All @@ -237,11 +237,11 @@ export const suggestProcessingPipelineRoute = createServerRoute({

const durationMs = Date.now() - startTime;
log.debug(
`Processing pipeline generated (stream=${stream.name} connectorId=${
params.body.connector_id
} durationMs=${durationMs} steps=${result.metadata.stepsUsed} hasPipeline=${
result.pipeline !== null
})`
`Processing pipeline generated (stream=${
stream.name
} connectorId=${connectorId} durationMs=${durationMs} steps=${
result.metadata.stepsUsed
} hasPipeline=${result.pipeline !== null})`
);

telemetry.trackProcessingPipelineSuggested({
Expand All @@ -262,7 +262,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({
catchError((error) => {
if (isNoLLMSuggestionsError(error)) {
log.debug(
`No LLM suggestions available for pipeline generation (stream=${params.path.name} connectorId=${params.body.connector_id})`
`No LLM suggestions available for pipeline generation (stream=${params.path.name} connectorId=${connectorId})`
);
// Return null pipeline instead of error - frontend will handle this gracefully
return [
Expand All @@ -275,9 +275,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({
const errorMessage = getErrorMessage(error) || 'Failed to generate pipeline suggestion';
log.error(
`Failed to generate pipeline suggestion (stream=${params.path.name}` +
` connectorId=${params.body.connector_id}${formatInferenceErrorMeta(
error
)}): ${errorMessage}`
` connectorId=${connectorId}${formatInferenceErrorMeta(error)}): ${errorMessage}`
);
if (isSSEError(error) && error.status) {
throw createSSERequestError(errorMessage, error.status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export interface ProcessingDissectSuggestionsParams {

export interface ProcessingDissectSuggestionsHandlerDeps {
params: ProcessingDissectSuggestionsParams;
connectorId: string;
inferenceClient: InferenceClient;
scopedClusterClient: IScopedClusterClient;
streamsClient: StreamsClient;
Expand All @@ -66,6 +67,7 @@ type FieldReviewResults = ToolCallsOfToolOptions<

export const handleProcessingDissectSuggestions = async ({
params,
connectorId,
inferenceClient,
streamsClient,
fieldsMetadataClient,
Expand All @@ -74,7 +76,6 @@ export const handleProcessingDissectSuggestions = async ({
logger,
}: ProcessingDissectSuggestionsHandlerDeps): Promise<DissectProcessor | null> => {
const { name: streamName } = params.path;
const { connector_id: connectorId } = params.body;

logger.debug(
`Starting extraction (stream=${streamName} messages=${params.body.sample_messages.length} connectorId=${connectorId})`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export interface ProcessingGrokSuggestionsParams {

export interface ProcessingGrokSuggestionsHandlerDeps {
params: ProcessingGrokSuggestionsParams;
connectorId: string;
inferenceClient: InferenceClient;
scopedClusterClient: IScopedClusterClient;
streamsClient: StreamsClient;
Expand All @@ -59,6 +60,7 @@ type FieldReviewResults = ToolCallsOfToolOptions<

export const handleProcessingGrokSuggestions = async ({
params,
connectorId,
inferenceClient,
streamsClient,
fieldsMetadataClient,
Expand All @@ -67,7 +69,6 @@ export const handleProcessingGrokSuggestions = async ({
logger,
}: ProcessingGrokSuggestionsHandlerDeps): Promise<GrokProcessor | null> => {
const { name: streamName } = params.path;
const { connector_id: connectorId } = params.body;

logger.debug(
`Starting extraction (stream=${streamName} messages=${params.body.sample_messages.length} connectorId=${connectorId})`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,13 @@ export const processingGrokSuggestionRoute = createServerRoute({
request,
});

const { connector_id: connectorId } = params.body;

// Wrap in Observable SSE to avoid timeout issues with long-running LLM requests
return from(
handleProcessingGrokSuggestions({
params,
connectorId,
inferenceClient,
streamsClient,
scopedClusterClient,
Expand Down Expand Up @@ -192,10 +195,13 @@ export const processingDissectSuggestionRoute = createServerRoute({
request,
});

const { connector_id: connectorId } = params.body;

// Wrap in Observable SSE to avoid timeout issues with long-running LLM requests
return from(
handleProcessingDissectSuggestions({
params,
connectorId,
inferenceClient,
streamsClient,
scopedClusterClient,
Expand Down
3 changes: 3 additions & 0 deletions x-pack/platform/plugins/shared/streams_app/moon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ dependsOn:
- '@kbn/cps'
- '@kbn/shared-ux-ai-components'
- '@kbn/deeplinks-management'
- '@kbn/core-http-browser'
- '@kbn/core-ui-settings-browser'
- '@kbn/inference-connectors'
tags:
- plugin
- prod
Expand Down
Loading
Loading