Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
*/

export { KnowledgeIndicatorsTable } from './knowledge_indicators_table';
export { KiGenerationProvider } from './ki_generation_context';
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { ListStreamDetail } from '@kbn/streams-plugin/server/routes/internal/streams/crud/route';
import type { OnboardingResult, TaskResult } from '@kbn/streams-schema';
import {
OnboardingStep,
STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID,
STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID,
TaskStatus,
} from '@kbn/streams-schema';
import React, {
createContext,
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useState,
} from 'react';
import { useInferenceFeatureConnectors } from '../../../../../hooks/sig_events/use_inference_feature_connectors';
import { useIndexPatternsConfig } from '../../../../../hooks/use_index_patterns_config';
import type { ScheduleOnboardingOptions } from '../../../../../hooks/use_onboarding_api';
import { useBulkOnboarding } from '../../hooks/use_bulk_onboarding';
import { useFetchStreams } from '../../hooks/use_fetch_streams';
import type { OnboardingConfig } from '../shared/types';

const IN_PROGRESS_STATUSES = new Set<TaskStatus>([TaskStatus.InProgress, TaskStatus.BeingCanceled]);

interface ConnectorState {
resolvedConnectorId: string | undefined;
loading: boolean;
}

interface KiGenerationContextValue {
filteredStreams: ListStreamDetail[] | undefined;
isStreamsLoading: boolean;
isInitialGenerationStatusLoading: boolean;
generatingStreamNames: string[];
isGenerating: boolean;
isScheduling: boolean;
streamStatusMap: Record<string, TaskResult<OnboardingResult>>;
onboardingConfig: OnboardingConfig;
setOnboardingConfig: (config: OnboardingConfig) => void;
featuresConnectors: ConnectorState;
queriesConnectors: ConnectorState;
bulkOnboardAll: (streamNames: string[]) => Promise<string[]>;
bulkOnboardFeaturesOnly: (streamNames: string[]) => Promise<string[]>;
bulkOnboardQueriesOnly: (streamNames: string[]) => Promise<string[]>;
bulkScheduleOnboardingTask: (
streamNames: string[],
options?: ScheduleOnboardingOptions
) => Promise<string[]>;
cancelOnboardingTask: (streamName: string) => Promise<void>;
}

const KiGenerationReactContext = createContext<KiGenerationContextValue | null>(null);

interface KiGenerationProviderProps {
children: React.ReactNode;
onTaskCompleted?: () => void;
onTaskFailed?: (error: string) => void;
}

export function KiGenerationProvider({
children,
onTaskCompleted,
onTaskFailed,
}: KiGenerationProviderProps) {
const [generatingStreams, setGeneratingStreams] = useState<Set<string>>(new Set());
const [streamStatusMap, setStreamStatusMap] = useState<
Record<string, TaskResult<OnboardingResult>>
>({});
const initialStatusFetchDoneRef = useRef(false);
// Dedup guard: filteredStreams gets a new array reference on every render
// (due to the select transform), which re-fires the status-fetch effect.
// This ref tracks already-enqueued names so only truly new streams trigger
// network calls.
const enqueuedStreamNamesRef = useRef<Set<string>>(new Set());

const { filterStreamsByIndexPatterns } = useIndexPatternsConfig();

const featuresConnectors = useInferenceFeatureConnectors(
STREAMS_SIG_EVENTS_KI_EXTRACTION_INFERENCE_FEATURE_ID
);
const queriesConnectors = useInferenceFeatureConnectors(
STREAMS_SIG_EVENTS_KI_QUERY_GENERATION_INFERENCE_FEATURE_ID
);

const [onboardingConfig, setOnboardingConfig] = useState<OnboardingConfig>({
steps: [OnboardingStep.FeaturesIdentification, OnboardingStep.QueriesGeneration],
connectors: {},
});

useEffect(() => {
setOnboardingConfig((prev) => {
const features = prev.connectors.features ?? featuresConnectors.resolvedConnectorId;
const queries = prev.connectors.queries ?? queriesConnectors.resolvedConnectorId;
if (features === prev.connectors.features && queries === prev.connectors.queries) {
return prev;
}
return { ...prev, connectors: { features, queries } };
});
}, [featuresConnectors.resolvedConnectorId, queriesConnectors.resolvedConnectorId]);

const streamsListFetch = useFetchStreams({
select: (result) => ({
...result,
streams: filterStreamsByIndexPatterns(result.streams),
}),
});
const filteredStreams = streamsListFetch.data?.streams;
const isStreamsLoading = streamsListFetch.isLoading;

// Adds streams discovered as InProgress (e.g. on initial status fetch after
// page refresh) and removes streams that reach a terminal state. Callback
// forwarding is gated on the initial-fetch flag so initial-load updates
// don't trigger consumer side effects (like error toasts).
const onStreamStatusUpdate = useCallback(
(streamName: string, taskResult: TaskResult<OnboardingResult>) => {
setStreamStatusMap((current) => ({ ...current, [streamName]: taskResult }));

const isInProgress = IN_PROGRESS_STATUSES.has(taskResult.status);

setGeneratingStreams((current) => {
const has = current.has(streamName);
if (isInProgress === has) return current;
const next = new Set(current);
if (isInProgress) {
next.add(streamName);
} else {
next.delete(streamName);
}
return next;
});

if (initialStatusFetchDoneRef.current) {
if (taskResult.status === TaskStatus.Failed) {
onTaskFailed?.(taskResult.error ?? 'Unknown error');
}
if (taskResult.status === TaskStatus.Completed) {
onTaskCompleted?.();
}
}
},
[onTaskCompleted, onTaskFailed]
);

const bulkOnboarding = useBulkOnboarding({ onboardingConfig, onStreamStatusUpdate });
const {
onboardingStatusUpdateQueue,
processStatusUpdateQueue,
bulkOnboardAll: rawBulkOnboardAll,
bulkOnboardFeaturesOnly: rawBulkOnboardFeaturesOnly,
bulkOnboardQueriesOnly: rawBulkOnboardQueriesOnly,
bulkScheduleOnboardingTask: rawBulkScheduleOnboardingTask,
} = bulkOnboarding;

useEffect(() => {
if (!filteredStreams) return;

let hasNew = false;
filteredStreams.forEach((item) => {
if (!enqueuedStreamNamesRef.current.has(item.stream.name)) {
enqueuedStreamNamesRef.current.add(item.stream.name);
onboardingStatusUpdateQueue.add(item.stream.name);
hasNew = true;
}
});
if (hasNew) {
processStatusUpdateQueue().finally(() => {
initialStatusFetchDoneRef.current = true;
});
}
}, [filteredStreams, onboardingStatusUpdateQueue, processStatusUpdateQueue]);

const isGenerating = generatingStreams.size > 0;
const generatingStreamNames = useMemo(() => Array.from(generatingStreams), [generatingStreams]);

// True until we've received at least one status result for every filtered
// stream, so consumers can defer rendering empty/generating UI until the
// generating set is known. Once false, stays false — transient refetches of
// the streams list must not flash the loading panel again.
const isInitialGenerationStatusLoading = useMemo(() => {
if (initialStatusFetchDoneRef.current) return false;
if (isStreamsLoading || !filteredStreams) return true;
return filteredStreams.some((item) => !(item.stream.name in streamStatusMap));
}, [isStreamsLoading, filteredStreams, streamStatusMap]);

const withGeneratingTracking = useCallback(
Comment thread
cesco-f marked this conversation as resolved.
(action: (streamNames: string[]) => Promise<string[]>) =>
async (streamNames: string[]): Promise<string[]> => {
if (streamNames.length > 0) {
setGeneratingStreams((current) => new Set([...current, ...streamNames]));
}
const succeeded = await action(streamNames);
if (succeeded.length < streamNames.length) {
const succeededSet = new Set(succeeded);
const failed = streamNames.filter((s) => !succeededSet.has(s));
setGeneratingStreams((current) => {
const next = new Set(current);
failed.forEach((s) => next.delete(s));
return next;
});
}
return succeeded;
},
[]
);

const bulkOnboardAll = useMemo(
() => withGeneratingTracking(rawBulkOnboardAll),
[withGeneratingTracking, rawBulkOnboardAll]
);
const bulkOnboardFeaturesOnly = useMemo(
() => withGeneratingTracking(rawBulkOnboardFeaturesOnly),
[withGeneratingTracking, rawBulkOnboardFeaturesOnly]
);
const bulkOnboardQueriesOnly = useMemo(
() => withGeneratingTracking(rawBulkOnboardQueriesOnly),
[withGeneratingTracking, rawBulkOnboardQueriesOnly]
);
const bulkScheduleOnboardingTask = useCallback(
(streamNames: string[], options?: ScheduleOnboardingOptions) =>
withGeneratingTracking((names) => rawBulkScheduleOnboardingTask(names, options))(streamNames),
[withGeneratingTracking, rawBulkScheduleOnboardingTask]
);

const value = useMemo<KiGenerationContextValue>(
() => ({
isScheduling: bulkOnboarding.isScheduling,
cancelOnboardingTask: bulkOnboarding.cancelOnboardingTask,
filteredStreams,
isStreamsLoading,
isInitialGenerationStatusLoading,
generatingStreamNames,
isGenerating,
streamStatusMap,
onboardingConfig,
setOnboardingConfig,
featuresConnectors,
queriesConnectors,
bulkOnboardAll,
bulkOnboardFeaturesOnly,
bulkOnboardQueriesOnly,
bulkScheduleOnboardingTask,
}),
[
bulkOnboarding.isScheduling,
bulkOnboarding.cancelOnboardingTask,
filteredStreams,
isStreamsLoading,
isInitialGenerationStatusLoading,
generatingStreamNames,
isGenerating,
streamStatusMap,
onboardingConfig,
setOnboardingConfig,
featuresConnectors,
queriesConnectors,
bulkOnboardAll,
bulkOnboardFeaturesOnly,
bulkOnboardQueriesOnly,
bulkScheduleOnboardingTask,
]
);

return (
<KiGenerationReactContext.Provider value={value}>{children}</KiGenerationReactContext.Provider>
);
}

export function useKiGeneration(): KiGenerationContextValue {
const context = useContext(KiGenerationReactContext);
if (!context) {
throw new Error('useKiGeneration must be used within KiGenerationProvider');
}
return context;
}
Loading
Loading