diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts index fa89b29768609..386c19440ab07 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts @@ -271,4 +271,7 @@ export { STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID, STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID, STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID, + STREAMS_INFERENCE_PARENT_FEATURE_ID, + STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID, + STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID, } from './src/inference_feature_ids'; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/inference_feature_ids.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/inference_feature_ids.ts index 68c6d92a4c627..f2645b3d2bb6a 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/inference_feature_ids.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/inference_feature_ids.ts @@ -20,3 +20,14 @@ export const STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID = /** Discovery and significant event generation. */ export const STREAMS_SIG_EVENTS_DISCOVERY_INFERENCE_FEATURE_ID = 'streams_sig_events_discovery' as const; + +/** Parent feature for Streams (Inference Feature Registry). */ +export const STREAMS_INFERENCE_PARENT_FEATURE_ID = 'streams' as const; + +/** Partitioning suggestions. */ +export const STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID = + 'streams_partitioning_suggestions' as const; + +/** Processing suggestions (pipelines, grok/dissect processors). */ +export const STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID = + 'streams_processing_suggestions' as const; diff --git a/x-pack/platform/plugins/shared/streams/server/plugin.ts b/x-pack/platform/plugins/shared/streams/server/plugin.ts index 0383380908ebe..f11400bf20901 100644 --- a/x-pack/platform/plugins/shared/streams/server/plugin.ts +++ b/x-pack/platform/plugins/shared/streams/server/plugin.ts @@ -58,6 +58,7 @@ import { baseFields } from './lib/streams/component_templates/logs_layer'; import { ecsBaseFields } from './lib/streams/component_templates/logs_ecs_layer'; import { registerStreamsAgentBuilder } from './agent_builder/register'; import { registerSignificantEventsInferenceFeatures } from './register_significant_events_inference_features'; +import { registerSuggestionsInferenceFeatures } from './register_suggestions_inference_features'; import { PatternExtractionService } from './lib/pattern_extraction/pattern_extraction_service'; import { createStreamsSettingsStorageClient } from './lib/streams/storage/streams_settings_storage_client'; import { @@ -133,6 +134,10 @@ export class StreamsPlugin plugins.searchInferenceEndpoints, this.logger.get('inference-features') ); + registerSuggestionsInferenceFeatures( + plugins.searchInferenceEndpoints, + this.logger.get('inference-features') + ); const inferenceResolver = createInferenceResolver(this.logger); diff --git a/x-pack/platform/plugins/shared/streams/server/register_suggestions_inference_features.ts b/x-pack/platform/plugins/shared/streams/server/register_suggestions_inference_features.ts new file mode 100644 index 0000000000000..a7dceac2c0771 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/register_suggestions_inference_features.ts @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/core/server'; +import type { SearchInferenceEndpointsPluginSetup } from '@kbn/search-inference-endpoints/server'; +import { i18n } from '@kbn/i18n'; +import { + STREAMS_INFERENCE_PARENT_FEATURE_ID, + STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID, + STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID, +} from '@kbn/streams-schema'; + +/** + * Registers Streams parent + child suggestion features with the Inference Feature Registry. + * No-op when the searchInferenceEndpoints plugin is unavailable. + */ +export function registerSuggestionsInferenceFeatures( + searchInferenceEndpoints: SearchInferenceEndpointsPluginSetup | undefined, + logger: Logger +): void { + if (!searchInferenceEndpoints) { + return; + } + + const { register } = searchInferenceEndpoints.features; + + const parentResult = register({ + featureId: STREAMS_INFERENCE_PARENT_FEATURE_ID, + featureName: i18n.translate('xpack.streams.inferenceFeature.suggestionsParentName', { + defaultMessage: 'Streams', + }), + featureDescription: i18n.translate( + 'xpack.streams.inferenceFeature.suggestionsParentDescription', + { + defaultMessage: 'AI models used for Streams suggestions.', + } + ), + taskType: 'chat_completion', + recommendedEndpoints: [], + }); + if (parentResult.ok) { + logger.debug(`Registered parent inference feature "${STREAMS_INFERENCE_PARENT_FEATURE_ID}"`); + } else { + logger.warn( + `Failed to register inference feature "${STREAMS_INFERENCE_PARENT_FEATURE_ID}": ${parentResult.error}` + ); + } + + const children: Array<{ + featureId: string; + featureName: string; + featureDescription: string; + recommendedEndpoints: string[]; + }> = [ + { + featureId: STREAMS_PARTITIONING_SUGGESTIONS_INFERENCE_FEATURE_ID, + featureName: i18n.translate('xpack.streams.inferenceFeature.partitionSuggestionsName', { + defaultMessage: 'Partitioning suggestions', + }), + featureDescription: i18n.translate( + 'xpack.streams.inferenceFeature.partitionSuggestionsDescription', + { + defaultMessage: 'Model used to suggest partitions.', + } + ), + recommendedEndpoints: [], + }, + { + featureId: STREAMS_PROCESSING_SUGGESTIONS_INFERENCE_FEATURE_ID, + featureName: i18n.translate('xpack.streams.inferenceFeature.processingSuggestionsName', { + defaultMessage: 'Processing suggestions', + }), + featureDescription: i18n.translate( + 'xpack.streams.inferenceFeature.processingSuggestionsDescription', + { + defaultMessage: 'Model used to suggest processing pipelines and grok/dissect processors.', + } + ), + recommendedEndpoints: [], + }, + ]; + + for (const child of children) { + const childResult = register({ + featureId: child.featureId, + parentFeatureId: STREAMS_INFERENCE_PARENT_FEATURE_ID, + featureName: child.featureName, + featureDescription: child.featureDescription, + taskType: 'chat_completion', + recommendedEndpoints: child.recommendedEndpoints, + }); + if (childResult.ok) { + logger.debug(`Registered child inference feature "${child.featureId}"`); + } else { + logger.warn( + `Failed to register inference feature "${child.featureId}": ${childResult.error}` + ); + } + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/connectors/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/connectors/route.ts index dd255fd3d196b..85485738bb0dd 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/connectors/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/connectors/route.ts @@ -9,25 +9,6 @@ import { z } from '@kbn/zod/v4'; import { STREAMS_API_PRIVILEGES } from '../../../../common/constants'; import { createServerRoute } from '../../create_server_route'; -export const getConnectorsRoute = createServerRoute({ - endpoint: 'GET /internal/streams/connectors', - options: { - access: 'internal', - summary: 'Get GenAI connectors', - description: 'Fetches all available GenAI connectors for AI features', - }, - security: { - authz: { - requiredPrivileges: [STREAMS_API_PRIVILEGES.read], - }, - }, - handler: async ({ request, server }) => { - const connectors = await server.inference.getConnectorList(request); - - return { connectors }; - }, -}); - export const getConnectorByIdRoute = createServerRoute({ endpoint: 'GET /internal/streams/connectors/{connectorId}', options: { @@ -49,6 +30,5 @@ export const getConnectorByIdRoute = createServerRoute({ }); export const connectorRoutes = { - ...getConnectorsRoute, ...getConnectorByIdRoute, }; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_partitions_route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_partitions_route.ts index c1d3da2e5c96a..e287c73d83004 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_partitions_route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_partitions_route.ts @@ -79,6 +79,8 @@ export const suggestPartitionsRoute = createServerRoute({ request, }); + const { connector_id: connectorId } = params.body; + const stream = await streamsClient.getStream(params.path.name); if (!Streams.WiredStream.Definition.is(stream)) { throw new StatusError('Partitioning suggestions are only available for wired streams', 400); @@ -86,7 +88,7 @@ export const suggestPartitionsRoute = createServerRoute({ const partitionsPromise = partitionStream({ definition: stream, - inferenceClient: inferenceClient.bindTo({ connectorId: params.body.connector_id }), + inferenceClient: inferenceClient.bindTo({ connectorId }), esClient: scopedClusterClient.asCurrentUser, logger, start: params.body.start, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts index 28394314874a3..d08f4c6597c07 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/management/suggest_processing_pipeline_route.ts @@ -91,9 +91,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ telemetry, }): Promise => { const log = logger.get('suggestProcessingPipeline'); - log.debug( - `Request received (stream=${params.path.name} connectorId=${params.body.connector_id})` - ); + const { connector_id: connectorId } = params.body; // Wrap entire logic in Observable so errors can be sent as SSE events return from( @@ -108,6 +106,8 @@ export const suggestProcessingPipelineRoute = createServerRoute({ const { inferenceClient, scopedClusterClient, streamsClient, fieldsMetadataClient } = await getScopedClients({ request }); + log.debug(`Request received (stream=${params.path.name} connectorId=${connectorId})`); + const stream = await streamsClient.getStream(params.path.name); if (!Streams.ingest.all.Definition.is(stream)) { throw new StatusError( @@ -134,7 +134,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ > = []; log.debug( - `Scheduling parallel grok + dissect extraction (stream=${stream.name} messages=${messages.length} fieldName=${fieldName} connectorId=${params.body.connector_id})` + `Scheduling parallel grok + dissect extraction (stream=${stream.name} messages=${messages.length} fieldName=${fieldName} connectorId=${connectorId})` ); candidatePromises.push( @@ -142,7 +142,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ messages, fieldName, streamName: stream.name, - connectorId: params.body.connector_id, + connectorId, documents: params.body.documents, patternExtractionService, inferenceClient, @@ -159,7 +159,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ messages, fieldName, streamName: stream.name, - connectorId: params.body.connector_id, + connectorId, documents: params.body.documents, patternExtractionService, inferenceClient, @@ -185,13 +185,13 @@ export const suggestProcessingPipelineRoute = createServerRoute({ const { reason } = result; if (isNoLLMSuggestionsError(reason)) { log.debug( - `No LLM suggestions available (stream=${stream.name} connectorId=${params.body.connector_id})` + `No LLM suggestions available (stream=${stream.name} connectorId=${connectorId})` ); } else { const meta = formatInferenceErrorMeta(reason); log.error( `Candidate failed (stream=${stream.name}` + - ` connectorId=${params.body.connector_id}${meta}): ${getErrorMessage(reason)}` + ` connectorId=${connectorId}${meta}): ${getErrorMessage(reason)}` ); } } @@ -216,7 +216,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ const result = await suggestProcessingPipeline({ definition: stream, - inferenceClient: inferenceClient.bindTo({ connectorId: params.body.connector_id }), + inferenceClient: inferenceClient.bindTo({ connectorId }), parsingProcessor, maxSteps, signal: abortController.signal, @@ -237,11 +237,11 @@ export const suggestProcessingPipelineRoute = createServerRoute({ const durationMs = Date.now() - startTime; log.debug( - `Processing pipeline generated (stream=${stream.name} connectorId=${ - params.body.connector_id - } durationMs=${durationMs} steps=${result.metadata.stepsUsed} hasPipeline=${ - result.pipeline !== null - })` + `Processing pipeline generated (stream=${ + stream.name + } connectorId=${connectorId} durationMs=${durationMs} steps=${ + result.metadata.stepsUsed + } hasPipeline=${result.pipeline !== null})` ); telemetry.trackProcessingPipelineSuggested({ @@ -262,7 +262,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ catchError((error) => { if (isNoLLMSuggestionsError(error)) { log.debug( - `No LLM suggestions available for pipeline generation (stream=${params.path.name} connectorId=${params.body.connector_id})` + `No LLM suggestions available for pipeline generation (stream=${params.path.name} connectorId=${connectorId})` ); // Return null pipeline instead of error - frontend will handle this gracefully return [ @@ -275,9 +275,7 @@ export const suggestProcessingPipelineRoute = createServerRoute({ const errorMessage = getErrorMessage(error) || 'Failed to generate pipeline suggestion'; log.error( `Failed to generate pipeline suggestion (stream=${params.path.name}` + - ` connectorId=${params.body.connector_id}${formatInferenceErrorMeta( - error - )}): ${errorMessage}` + ` connectorId=${connectorId}${formatInferenceErrorMeta(error)}): ${errorMessage}` ); if (isSSEError(error) && error.status) { throw createSSERequestError(errorMessage, error.status); diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/dissect_suggestions_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/dissect_suggestions_handler.ts index 69b1241f6a5bd..96a776f43ea10 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/dissect_suggestions_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/dissect_suggestions_handler.ts @@ -42,6 +42,7 @@ export interface ProcessingDissectSuggestionsParams { export interface ProcessingDissectSuggestionsHandlerDeps { params: ProcessingDissectSuggestionsParams; + connectorId: string; inferenceClient: InferenceClient; scopedClusterClient: IScopedClusterClient; streamsClient: StreamsClient; @@ -66,6 +67,7 @@ type FieldReviewResults = ToolCallsOfToolOptions< export const handleProcessingDissectSuggestions = async ({ params, + connectorId, inferenceClient, streamsClient, fieldsMetadataClient, @@ -74,7 +76,6 @@ export const handleProcessingDissectSuggestions = async ({ logger, }: ProcessingDissectSuggestionsHandlerDeps): Promise => { const { name: streamName } = params.path; - const { connector_id: connectorId } = params.body; logger.debug( `Starting extraction (stream=${streamName} messages=${params.body.sample_messages.length} connectorId=${connectorId})` diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/grok_suggestions_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/grok_suggestions_handler.ts index ce1ad28261f2f..a55394e459a70 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/grok_suggestions_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/grok_suggestions_handler.ts @@ -35,6 +35,7 @@ export interface ProcessingGrokSuggestionsParams { export interface ProcessingGrokSuggestionsHandlerDeps { params: ProcessingGrokSuggestionsParams; + connectorId: string; inferenceClient: InferenceClient; scopedClusterClient: IScopedClusterClient; streamsClient: StreamsClient; @@ -59,6 +60,7 @@ type FieldReviewResults = ToolCallsOfToolOptions< export const handleProcessingGrokSuggestions = async ({ params, + connectorId, inferenceClient, streamsClient, fieldsMetadataClient, @@ -67,7 +69,6 @@ export const handleProcessingGrokSuggestions = async ({ logger, }: ProcessingGrokSuggestionsHandlerDeps): Promise => { const { name: streamName } = params.path; - const { connector_id: connectorId } = params.body; logger.debug( `Starting extraction (stream=${streamName} messages=${params.body.sample_messages.length} connectorId=${connectorId})` diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts index 830f68efde428..08926e89077bf 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/route.ts @@ -128,10 +128,13 @@ export const processingGrokSuggestionRoute = createServerRoute({ request, }); + const { connector_id: connectorId } = params.body; + // Wrap in Observable SSE to avoid timeout issues with long-running LLM requests return from( handleProcessingGrokSuggestions({ params, + connectorId, inferenceClient, streamsClient, scopedClusterClient, @@ -192,10 +195,13 @@ export const processingDissectSuggestionRoute = createServerRoute({ request, }); + const { connector_id: connectorId } = params.body; + // Wrap in Observable SSE to avoid timeout issues with long-running LLM requests return from( handleProcessingDissectSuggestions({ params, + connectorId, inferenceClient, streamsClient, scopedClusterClient, diff --git a/x-pack/platform/plugins/shared/streams_app/moon.yml b/x-pack/platform/plugins/shared/streams_app/moon.yml index 23af75ff35ad2..637dc9d90b542 100644 --- a/x-pack/platform/plugins/shared/streams_app/moon.yml +++ b/x-pack/platform/plugins/shared/streams_app/moon.yml @@ -111,6 +111,9 @@ dependsOn: - '@kbn/cps' - '@kbn/shared-ux-ai-components' - '@kbn/deeplinks-management' + - '@kbn/core-http-browser' + - '@kbn/core-ui-settings-browser' + - '@kbn/inference-connectors' tags: - plugin - prod diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_management/data_management/stream_detail_routing/review_suggestions_form/generate_suggestions_button.test.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_management/data_management/stream_detail_routing/review_suggestions_form/generate_suggestions_button.test.tsx index 08aff99f8f71f..f6dbb22226c11 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_management/data_management/stream_detail_routing/review_suggestions_form/generate_suggestions_button.test.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_management/data_management/stream_detail_routing/review_suggestions_form/generate_suggestions_button.test.tsx @@ -57,7 +57,6 @@ const createMockGenAiConnectors = ( selectConnector: jest.fn(), reloadConnectors: jest.fn(), isConnectorSelectionRestricted: false, - defaultConnector: undefined, ...overrides, }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_ai_features.tsx b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_ai_features.tsx index 5ee6218041f8f..dec562a0a3d18 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_ai_features.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_ai_features.tsx @@ -28,7 +28,7 @@ export interface AIFeatures { export function useAIFeatures(): AIFeatures | null { const { dependencies: { - start: { licensing, streams }, + start: { licensing }, }, core, } = useKibana(); @@ -36,8 +36,8 @@ export function useAIFeatures(): AIFeatures | null { const isAIAvailableForTier = core.pricing.isFeatureAvailable(STREAMS_TIERED_AI_FEATURE.id); const genAiConnectors = useGenAIConnectors({ - streamsRepositoryClient: streams.streamsRepositoryClient, - uiSettings: core.uiSettings, + http: core.http, + settings: core.settings, }); const license = useObservable(licensing.license$); const [tourCalloutDismissed, setTourCalloutDismissed] = useElasticLlmCalloutDismissed( diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.test.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.test.ts index c8ce2d33cef4e..fd0fa97185f2e 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.test.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.test.ts @@ -7,53 +7,59 @@ import { renderHook, act, waitFor } from '@testing-library/react'; import { useGenAIConnectors } from './use_genai_connectors'; -import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; -import type { IUiSettingsClient } from '@kbn/core/public'; +import type { HttpSetup, IHttpFetchError } from '@kbn/core-http-browser'; +import type { SettingsStart } from '@kbn/core-ui-settings-browser'; import { InferenceConnectorType } from '@kbn/inference-common'; -import { - GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR, - GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR_DEFAULT_ONLY, -} from '@kbn/management-settings-ids'; + +jest.mock('@kbn/inference-connectors', () => { + const actual = jest.requireActual('@kbn/inference-connectors'); + return { + ...actual, + useLoadConnectors: jest.fn(), + }; +}); + +import { useLoadConnectors } from '@kbn/inference-connectors'; + +const mockUseLoadConnectors = useLoadConnectors as jest.MockedFunction; const STREAMS_CONNECTOR_STORAGE_KEY = 'xpack.streamsApp.lastUsedConnector'; const OLD_STORAGE_KEY = 'xpack.observabilityAiAssistant.lastUsedConnector'; -const createMockConnector = (connectorId: string, name: string) => ({ - connectorId, +const createMockAIConnector = (id: string, name: string) => ({ + id, name, - type: InferenceConnectorType.OpenAI, + actionTypeId: InferenceConnectorType.OpenAI, config: {}, - capabilities: {}, + secrets: {}, isPreconfigured: false, - isInferenceEndpoint: false, + isSystemAction: false, + isDeprecated: false, + isConnectorTypeDeprecated: false, + isMissingSecrets: false, }); +const createLoadConnectorsResult = ( + connectors: ReturnType[], + overrides: Partial> = {} +) => + ({ + data: connectors, + isLoading: false, + error: null, + refetch: jest.fn(), + soEntryFound: false, + ...overrides, + } as unknown as ReturnType); + describe('useGenAIConnectors', () => { - let mockStreamsRepositoryClient: jest.Mocked; - let mockUiSettings: jest.Mocked; + const mockHttp = {} as HttpSetup; + const mockSettings = {} as SettingsStart; beforeEach(() => { jest.clearAllMocks(); - - // Clear localStorage localStorage.removeItem(STREAMS_CONNECTOR_STORAGE_KEY); localStorage.removeItem(OLD_STORAGE_KEY); - - mockStreamsRepositoryClient = { - fetch: jest.fn(), - } as unknown as jest.Mocked; - - mockUiSettings = { - get: jest.fn((key: string, defaultValue?: unknown) => { - if (key === GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR) { - return 'NO_DEFAULT_CONNECTOR'; - } - if (key === GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR_DEFAULT_ONLY) { - return defaultValue ?? false; - } - return defaultValue; - }), - } as unknown as jest.Mocked; }); afterEach(() => { @@ -63,158 +69,81 @@ describe('useGenAIConnectors', () => { describe('connector fallback behavior', () => { it('selects the first available connector when selected connector is no longer available', async () => { - // Setup: user has a connector in localStorage that no longer exists localStorage.setItem(STREAMS_CONNECTOR_STORAGE_KEY, JSON.stringify('deleted-connector')); - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [ - createMockConnector('connector-1', 'Connector 1'), - createMockConnector('connector-2', 'Connector 2'), - ], - }); + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([ + createMockAIConnector('connector-1', 'Connector 1'), + createMockAIConnector('connector-2', 'Connector 2'), + ]) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - // Wait for connectors to load and fallback to trigger - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - await waitFor(() => { expect(result.current.selectedConnector).toBe('connector-1'); }); }); - it('does not change connector when selected connector is still available', async () => { + it('does not change connector when selected connector is still available', () => { localStorage.setItem(STREAMS_CONNECTOR_STORAGE_KEY, JSON.stringify('connector-2')); - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [ - createMockConnector('connector-1', 'Connector 1'), - createMockConnector('connector-2', 'Connector 2'), - ], - }); + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([ + createMockAIConnector('connector-1', 'Connector 1'), + createMockAIConnector('connector-2', 'Connector 2'), + ]) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - expect(result.current.selectedConnector).toBe('connector-2'); }); - it('falls back to first connector when default connector setting points to non-existent connector', async () => { - // No localStorage value, but default connector doesn't exist - mockUiSettings.get.mockImplementation((key: string, defaultValue?: unknown) => { - if (key === GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR) { - return 'non-existent-default'; - } - if (key === GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR_DEFAULT_ONLY) { - return false; - } - return defaultValue; - }); - - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [ - createMockConnector('connector-1', 'Connector 1'), - createMockConnector('connector-2', 'Connector 2'), - ], - }); - - const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + it('returns first connector as selectedConnector when no connector is explicitly selected', () => { + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([ + createMockAIConnector('connector-1', 'Connector 1'), + createMockAIConnector('connector-2', 'Connector 2'), + ]) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - - await waitFor(() => { - expect(result.current.selectedConnector).toBe('connector-1'); - }); - }); - - it('returns first connector as selectedConnector when no connector is explicitly selected', async () => { - // No localStorage value, no default connector - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [ - createMockConnector('connector-1', 'Connector 1'), - createMockConnector('connector-2', 'Connector 2'), - ], - }); - const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - - // The hook returns `selectedConnector || connectors?.[0]?.id` expect(result.current.selectedConnector).toBe('connector-1'); }); - it('returns undefined selectedConnector when no connectors exist', async () => { - // No localStorage value, no connectors available - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [], - }); + it('returns undefined selectedConnector when no connectors exist', () => { + mockUseLoadConnectors.mockReturnValue(createLoadConnectorsResult([])); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - expect(result.current.selectedConnector).toBeUndefined(); expect(result.current.connectors).toEqual([]); }); }); describe('selectConnector', () => { - it('updates the selected connector', async () => { - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [ - createMockConnector('connector-1', 'Connector 1'), - createMockConnector('connector-2', 'Connector 2'), - ], - }); + it('updates the selected connector', () => { + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([ + createMockAIConnector('connector-1', 'Connector 1'), + createMockAIConnector('connector-2', 'Connector 2'), + ]) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - act(() => { result.current.selectConnector('connector-2'); }); @@ -224,55 +153,64 @@ describe('useGenAIConnectors', () => { }); describe('loading state', () => { - it('starts with loading true', () => { - mockStreamsRepositoryClient.fetch.mockReturnValue(new Promise(() => {})); // Never resolves + it('reports loading when useLoadConnectors is loading', () => { + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([], { isLoading: true, data: undefined }) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); expect(result.current.loading).toBe(true); }); - it('sets loading to false after fetch completes', async () => { - mockStreamsRepositoryClient.fetch.mockResolvedValue({ - connectors: [createMockConnector('connector-1', 'Connector 1')], - }); + it('reports not loading when useLoadConnectors finishes', () => { + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([createMockAIConnector('connector-1', 'Connector 1')]) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); + expect(result.current.loading).toBe(false); }); }); describe('error handling', () => { - it('sets error when fetch fails', async () => { - const error = new Error('Failed to fetch connectors'); - mockStreamsRepositoryClient.fetch.mockRejectedValue(error); + it('exposes the error from useLoadConnectors', () => { + const error = new Error('Failed to fetch connectors') as unknown as IHttpFetchError; + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([], { error, data: undefined }) + ); const { result } = renderHook(() => - useGenAIConnectors({ - streamsRepositoryClient: mockStreamsRepositoryClient, - uiSettings: mockUiSettings, - }) + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) ); - await waitFor(() => { - expect(result.current.loading).toBe(false); - }); - expect(result.current.error).toBe(error); expect(result.current.connectors).toBeUndefined(); }); }); + + describe('connector mapping', () => { + it('maps AIConnector to InferenceConnector shape', () => { + mockUseLoadConnectors.mockReturnValue( + createLoadConnectorsResult([createMockAIConnector('c-1', 'My Connector')]) + ); + + const { result } = renderHook(() => + useGenAIConnectors({ http: mockHttp, settings: mockSettings }) + ); + + expect(result.current.connectors).toEqual([ + expect.objectContaining({ + connectorId: 'c-1', + name: 'My Connector', + type: InferenceConnectorType.OpenAI, + }), + ]); + }); + }); }); diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.ts b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.ts index 88868fa9f60b3..5d3c59f9560d7 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/hooks/use_genai_connectors.ts @@ -5,20 +5,16 @@ * 2.0. */ -import { useState, useEffect, useCallback, useMemo } from 'react'; +import { useCallback, useMemo } from 'react'; import useLocalStorage from 'react-use/lib/useLocalStorage'; -import type { IUiSettingsClient } from '@kbn/core/public'; -import type { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; -import type { InferenceConnector } from '@kbn/inference-common'; -import { - GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR, - GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR_DEFAULT_ONLY, -} from '@kbn/management-settings-ids'; +import type { HttpSetup } from '@kbn/core-http-browser'; +import type { SettingsStart } from '@kbn/core-ui-settings-browser'; +import type { InferenceConnector, InferenceConnectorType } from '@kbn/inference-common'; +import { useLoadConnectors, type AIConnector } from '@kbn/inference-connectors'; +import { STREAMS_INFERENCE_PARENT_FEATURE_ID } from '@kbn/streams-schema'; const STREAMS_CONNECTOR_STORAGE_KEY = 'xpack.streamsApp.lastUsedConnector'; const OLD_STORAGE_KEY = 'xpack.observabilityAiAssistant.lastUsedConnector'; -// TODO: Import from gen-ai-settings-plugin (package) once available -const NO_DEFAULT_CONNECTOR = 'NO_DEFAULT_CONNECTOR'; export interface UseGenAIConnectorsResult { connectors: InferenceConnector[] | undefined; @@ -28,104 +24,57 @@ export interface UseGenAIConnectorsResult { selectConnector: (id: string) => void; reloadConnectors: () => Promise; isConnectorSelectionRestricted: boolean; - defaultConnector: string | undefined; } +const toInferenceConnector = (c: AIConnector): InferenceConnector => ({ + connectorId: c.id, + name: c.name, + type: c.actionTypeId as InferenceConnectorType, + config: 'config' in c ? (c.config as Record) : {}, + capabilities: {}, + isPreconfigured: c.isPreconfigured, + isInferenceEndpoint: false, + isEis: c.isEis, + isDeprecated: c.isDeprecated, + isMissingSecrets: c.isMissingSecrets, +}); + export function useGenAIConnectors({ - streamsRepositoryClient, - uiSettings, + http, + settings, }: { - streamsRepositoryClient: StreamsRepositoryClient; - uiSettings: IUiSettingsClient; + http: HttpSetup; + settings: SettingsStart; }): UseGenAIConnectorsResult { - const [connectors, setConnectors] = useState(); - const [loading, setLoading] = useState(true); - const [error, setError] = useState(); - - // Read settings - const defaultConnector = uiSettings.get(GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR); - const genAISettingsDefaultOnly = uiSettings.get( - GEN_AI_SETTINGS_DEFAULT_AI_CONNECTOR_DEFAULT_ONLY, - false - ); + const { + data: aiConnectors, + isLoading, + error: queryError, + refetch, + soEntryFound, + } = useLoadConnectors({ + http, + featureId: STREAMS_INFERENCE_PARENT_FEATURE_ID, + settings, + }); + + const connectors = useMemo(() => aiConnectors?.map(toInferenceConnector), [aiConnectors]); + + const isConnectorSelectionRestricted = soEntryFound; - const isConnectorSelectionRestricted = - genAISettingsDefaultOnly && defaultConnector !== NO_DEFAULT_CONNECTOR; - - // Read old localStorage key (for backward compatibility, don't modify it) const [oldConnector] = useLocalStorage(OLD_STORAGE_KEY); - - // Use old connector as initial value for new key (only if new key doesn't exist yet) const [lastUsedConnector, setLastUsedConnector] = useLocalStorage( STREAMS_CONNECTOR_STORAGE_KEY, oldConnector ); - const fetchConnectors = useCallback(async () => { - setLoading(true); - setError(undefined); - - try { - const controller = new AbortController(); - const response = await streamsRepositoryClient.fetch('GET /internal/streams/connectors', { - signal: controller.signal, - }); - let results = response.connectors; - - // If connector selection is restricted, only return the default connector - if (isConnectorSelectionRestricted) { - const defaultC = results.find((con) => con.connectorId === defaultConnector); - results = defaultC ? [defaultC] : []; - } - - setConnectors(results); - - // Clear lastUsedConnector if it's no longer in the list - setLastUsedConnector((connectorId) => { - if ( - connectorId && - results.findIndex((result) => result.connectorId === connectorId) === -1 - ) { - return undefined; - } - return connectorId; - }); - } catch (err) { - setError(err as Error); - setConnectors(undefined); - } finally { - setLoading(false); - } - }, [ - streamsRepositoryClient, - isConnectorSelectionRestricted, - defaultConnector, - setLastUsedConnector, - ]); - - useEffect(() => { - fetchConnectors(); - }, [fetchConnectors]); - - // Determine selected connector (follows observability pattern) const selectedConnector = useMemo(() => { - // If restricted, always use default - if (isConnectorSelectionRestricted) { - return defaultConnector; - } - - // Priority 1: User's explicit choice (localStorage) - if (lastUsedConnector) { + const ids = connectors?.map((c) => c.connectorId); + if (lastUsedConnector && ids?.includes(lastUsedConnector)) { return lastUsedConnector; } - - // Priority 2: Global AI default setting - if (defaultConnector !== NO_DEFAULT_CONNECTOR) { - return defaultConnector; - } - - return undefined; - }, [isConnectorSelectionRestricted, defaultConnector, lastUsedConnector]); + return connectors?.[0]?.connectorId; + }, [lastUsedConnector, connectors]); const selectConnector = useCallback( (id: string) => { @@ -135,30 +84,16 @@ export function useGenAIConnectors({ ); const reloadConnectors = useCallback(async () => { - await fetchConnectors(); - }, [fetchConnectors]); - - // If the selected connector is no longer available, select the first available connector - useEffect(() => { - const availableConnectors = connectors?.map((connector) => connector.connectorId); - - if ( - selectedConnector && - availableConnectors && - !availableConnectors.includes(selectedConnector) - ) { - setLastUsedConnector(availableConnectors[0]); // First or undefined if empty - } - }, [connectors, setLastUsedConnector, selectedConnector]); + await refetch(); + }, [refetch]); return { connectors, - selectedConnector: selectedConnector || connectors?.[0]?.connectorId, - loading, - error, + selectedConnector, + loading: isLoading, + error: queryError ?? undefined, selectConnector, reloadConnectors, isConnectorSelectionRestricted, - defaultConnector: defaultConnector === NO_DEFAULT_CONNECTOR ? undefined : defaultConnector, }; } diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/create_steps.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/create_steps.spec.ts index e3b4abf068e91..062b4d23fd88b 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/create_steps.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/create_steps.spec.ts @@ -41,10 +41,10 @@ test.describe( }) => { // Mock the connectors endpoint to return no connectors, ensuring AI features // are disabled regardless of the environment (local, ECH, serverless) - await page.route('**/internal/streams/connectors', async (route) => { + await page.route('**/internal/search_inference_endpoints/connectors*', async (route) => { await route.fulfill({ status: 200, - body: JSON.stringify({ connectors: [] }), + body: JSON.stringify({ connectors: [], allConnectors: [], soEntryFound: false }), }); }); await page.reload(); diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/pipeline_suggestions.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/pipeline_suggestions.spec.ts index 9ef09b01b3cf6..4163e441ee93b 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/pipeline_suggestions.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_processing/pipeline_suggestions.spec.ts @@ -80,10 +80,10 @@ test.describe( await expect(pageObjects.streams.getSuggestPipelineButton()).toBeVisible(); // Mock no connectors and verify button is hidden - await page.route('**/internal/streams/connectors', async (route) => { + await page.route('**/internal/search_inference_endpoints/connectors*', async (route) => { await route.fulfill({ status: 200, - body: JSON.stringify({ connectors: [] }), + body: JSON.stringify({ connectors: [], allConnectors: [], soEntryFound: false }), }); }); await page.reload(); diff --git a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_routing/ai_suggestions_button.spec.ts b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_routing/ai_suggestions_button.spec.ts index 0de451e36cd23..2137e92c4c419 100644 --- a/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_routing/ai_suggestions_button.spec.ts +++ b/x-pack/platform/plugins/shared/streams_app/test/scout/ui/tests/data_management/data_routing/ai_suggestions_button.spec.ts @@ -46,10 +46,10 @@ test.describe('Stream data routing - AI suggestions button', { tag: tags.statefu }); test('should disable button when no connector is selected', async ({ page }) => { - await page.route('**/internal/streams/connectors', async (route) => { + await page.route('**/internal/search_inference_endpoints/connectors*', async (route) => { await route.fulfill({ status: 200, - body: JSON.stringify({ connectors: [] }), + body: JSON.stringify({ connectors: [], allConnectors: [], soEntryFound: false }), }); }); @@ -60,22 +60,32 @@ test.describe('Stream data routing - AI suggestions button', { tag: tags.statefu }); test('should show connector dropdown when multiple connectors exist', async ({ page }) => { - await page.route('**/internal/streams/connectors', async (route) => { + await page.route('**/internal/search_inference_endpoints/connectors*', async (route) => { await route.fulfill({ status: 200, body: JSON.stringify({ - connectors: [ + connectors: [], + allConnectors: [ { - id: 'test-connector-1', + connectorId: 'test-connector-1', name: 'Test Connector 1', - actionTypeId: '.gen-ai', + type: '.gen-ai', + config: {}, + capabilities: {}, + isPreconfigured: false, + isInferenceEndpoint: false, }, { - id: 'test-connector-2', + connectorId: 'test-connector-2', name: 'Test Connector 2', - actionTypeId: '.gen-ai', + type: '.gen-ai', + config: {}, + capabilities: {}, + isPreconfigured: false, + isInferenceEndpoint: false, }, ], + soEntryFound: false, }), }); }); diff --git a/x-pack/platform/plugins/shared/streams_app/tsconfig.json b/x-pack/platform/plugins/shared/streams_app/tsconfig.json index d7d2058442722..9ca42e38a9d24 100644 --- a/x-pack/platform/plugins/shared/streams_app/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams_app/tsconfig.json @@ -112,5 +112,8 @@ "@kbn/cps", "@kbn/shared-ux-ai-components", "@kbn/deeplinks-management", + "@kbn/core-http-browser", + "@kbn/core-ui-settings-browser", + "@kbn/inference-connectors", ] }