diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx index 467435335dd73..907996e5866b3 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processor_outcome_preview.tsx @@ -18,8 +18,9 @@ import { import { Sample } from '@kbn/grok-ui'; import { i18n } from '@kbn/i18n'; import { GrokProcessorDefinition } from '@kbn/streams-schema'; -import { isEmpty } from 'lodash'; import React, { useMemo } from 'react'; +import { isEmpty } from 'lodash'; +import { getPercentageFormatter } from '../../../util/formatters'; import { PreviewTable } from '../preview_table'; import { PreviewDocsFilterOption, @@ -104,10 +105,7 @@ export const ProcessorOutcomePreview = () => { ); }; -const formatter = new Intl.NumberFormat('en-US', { - style: 'percent', - maximumFractionDigits: 1, -}); +const formatter = getPercentageFormatter(); const formatRateToPercentage = (rate?: number) => (rate ? formatter.format(rate) : undefined) as any; // This is a workaround for the type error, since the numFilters & numActiveFilters props are defined as number | undefined diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx index 16d273ba0f986..821cb0c1e5180 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_enrichment/processors/processor_metrics.tsx @@ -20,14 +20,12 @@ import React from 'react'; import { i18n } from '@kbn/i18n'; import useToggle from 'react-use/lib/useToggle'; import { css } from '@emotion/react'; +import { getPercentageFormatter } from '../../../../util/formatters'; import { ProcessorMetrics } from '../state_management/simulation_state_machine'; type ProcessorMetricBadgesProps = ProcessorMetrics; -const formatter = new Intl.NumberFormat('en-US', { - style: 'percent', - maximumFractionDigits: 1, -}); +const formatter = getPercentageFormatter(); export const ProcessorMetricBadges = ({ detected_fields, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/index.tsx index 62c98944c5781..8760bd9c3d4e9 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/index.tsx @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import React from 'react'; import { EuiButton, EuiFlexGroup, @@ -14,11 +15,11 @@ import { } from '@elastic/eui'; import { css } from '@emotion/css'; import { Streams } from '@kbn/streams-schema'; -import React from 'react'; import { useUnsavedChangesPrompt } from '@kbn/unsaved-changes-prompt'; import { i18n } from '@kbn/i18n'; import { toMountPoint } from '@kbn/react-kibana-mount'; import { CoreStart } from '@kbn/core/public'; +import { useTimefilter } from '../../../hooks/use_timefilter'; import { useKibana } from '../../../hooks/use_kibana'; import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; import { ChildStreamList } from './child_stream_list'; @@ -47,12 +48,15 @@ export function StreamDetailRouting(props: StreamDetailRoutingProps) { streams: { streamsRepositoryClient }, } = dependencies.start; + const { timeState$ } = useTimefilter(); + return ( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_matches.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_matches.tsx index 48e2a8ba50d2a..c9d40e1c07958 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_matches.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_matches.tsx @@ -8,7 +8,7 @@ import { EuiText, EuiLoadingSpinner, EuiIconTip } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React from 'react'; -import { AsyncSample } from '../../../hooks/queries/use_async_sample'; +import { RoutingSamplesContext } from './state_management/stream_routing_state_machine/routing_samples_state_machine'; const matchText = i18n.translate('xpack.streams.streamRouting.previewMatchesText', { defaultMessage: 'Approximate match rate', @@ -23,9 +23,9 @@ export const PreviewMatches = ({ error, isLoading, }: { - approximateMatchingPercentage: AsyncSample['approximateMatchingPercentage']; - error: AsyncSample['documentCountsError']; - isLoading: AsyncSample['isLoadingDocumentCounts']; + approximateMatchingPercentage: RoutingSamplesContext['approximateMatchingPercentage']; + error?: RoutingSamplesContext['approximateMatchingPercentageError']; + isLoading: boolean; }) => { if (isLoading) { return ( @@ -39,8 +39,7 @@ export const PreviewMatches = ({ if (error) { return ( - {`${matchText}: `} - {errorText} + {`${matchText}: ${errorText}`} ); } @@ -48,8 +47,7 @@ export const PreviewMatches = ({ if (approximateMatchingPercentage) { return ( - {`${matchText}: `} - {`${approximateMatchingPercentage}%`} + {`${matchText}: ${approximateMatchingPercentage}`} ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_panel.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_panel.tsx index 9f9dd7c8695f0..44f5e18520173 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_panel.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/preview_panel.tsx @@ -16,119 +16,121 @@ import { } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import { isEmpty } from 'lodash'; -import React, { useEffect } from 'react'; -import { useAsyncSample } from '../../../hooks/queries/use_async_sample'; -import { useTimefilter } from '../../../hooks/use_timefilter'; -import { useDebounced } from '../../../util/use_debounce'; +import React from 'react'; import { AssetImage } from '../../asset_image'; import { StreamsAppSearchBar } from '../../streams_app_search_bar'; import { PreviewTable } from '../preview_table'; import { PreviewMatches } from './preview_matches'; import { - selectCurrentRule, + useStreamSamplesSelector, useStreamsRoutingSelector, } from './state_management/stream_routing_state_machine'; export function PreviewPanel() { const routingSnapshot = useStreamsRoutingSelector((snapshot) => snapshot); - const isIdle = routingSnapshot.matches({ ready: 'idle' }); - const isCreatingNewRule = routingSnapshot.matches({ ready: 'creatingNewRule' }); - const isEditingRule = routingSnapshot.matches({ ready: 'editingRule' }); - const isReorideringRules = routingSnapshot.matches({ ready: 'reorderingRules' }); + let content; - const condition = isCreatingNewRule ? selectCurrentRule(routingSnapshot.context).if : undefined; - const definition = routingSnapshot.context.definition; + if (routingSnapshot.matches({ ready: 'idle' })) { + content = ; + } else if ( + routingSnapshot.matches({ ready: 'editingRule' }) || + routingSnapshot.matches({ ready: 'reorderingRules' }) + ) { + content = ; + } else if (routingSnapshot.matches({ ready: 'creatingNewRule' })) { + content = ; + } - const debouncedCondition = useDebounced(condition, 300); + return ( + <> + + + + + + {i18n.translate('xpack.streams.streamDetail.preview.header', { + defaultMessage: 'Data Preview', + })} + + + + + + + {content} + + ); +} - const { timeState, timeState$ } = useTimefilter(); +const IdlePanel = () => ( + } + titleSize="s" + title={ +

+ {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessageEmpty', { + defaultMessage: 'Your preview will appear here', + })} +

+ } + body={i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessageEmptyDescription', { + defaultMessage: + 'Create a new child stream to see what will be routed to it based on the conditions', + })} + /> +); +const EditingPanel = () => ( + } + titleSize="s" + title={ +

+ {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessage', { + defaultMessage: 'Preview is not available while editing or reordering streams', + })} +

+ } + body={ + <> +

+ {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessageBody', { + defaultMessage: + 'Once you save your changes, the results of your conditions will appear here.', + })} +

+

+ {i18n.translate('xpack.streams.streamDetail.preview.editPreviewReorderingWarning', { + defaultMessage: + 'Additionally, you will not be able to edit existing streams while reordering them, you should save or cancel your changes first.', + })} +

+ + } + /> +); + +const RuleCreationPanel = () => { + const samplesSnapshot = useStreamSamplesSelector((snapshot) => snapshot); + const isLoadingDocuments = samplesSnapshot.matches({ fetching: { documents: 'loading' } }); + const isUpdating = + samplesSnapshot.matches('debouncingCondition') || + samplesSnapshot.matches({ fetching: { documents: 'loading' } }); const { - isLoadingDocuments, documents, documentsError, - refresh, approximateMatchingPercentage, - isLoadingDocumentCounts, - documentCountsError, - } = useAsyncSample({ - condition: debouncedCondition, - start: timeState.start, - end: timeState.end, - size: 100, - streamDefinition: definition, - }); - + approximateMatchingPercentageError, + } = samplesSnapshot.context; const hasDocuments = !isEmpty(documents); + const isLoadingDocumentCounts = samplesSnapshot.matches({ + fetching: { documentCounts: 'loading' }, + }); - useEffect(() => { - const subscription = timeState$.subscribe({ - next: ({ kind }) => { - if (kind === 'override') { - refresh(); - } - }, - }); - return () => { - subscription.unsubscribe(); - }; - }, [timeState$, refresh]); - - let content; + let content = null; - if (isIdle) { - content = ( - } - titleSize="s" - title={ -

- {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessageEmpty', { - defaultMessage: 'Your preview will appear here', - })} -

- } - body={i18n.translate( - 'xpack.streams.streamDetail.preview.editPreviewMessageEmptyDescription', - { - defaultMessage: - 'Create a new child stream to see what will be routed to it based on the conditions', - } - )} - /> - ); - } else if (isEditingRule || isReorideringRules) { - content = ( - } - titleSize="s" - title={ -

- {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessage', { - defaultMessage: 'Preview is not available while editing or reordering streams', - })} -

- } - body={ - <> -

- {i18n.translate('xpack.streams.streamDetail.preview.editPreviewMessageBody', { - defaultMessage: - 'You will find here the result from the conditions you have made once you save the changes', - })} -

-

- {i18n.translate('xpack.streams.streamDetail.preview.editPreviewReorderingWarning', { - defaultMessage: - 'Additionally, you will not be able to edit existing streams while reordering them, you should save or cancel your changes first.', - })} -

- - } - /> - ); - } else if (isCreatingNewRule && isLoadingDocuments && !hasDocuments) { + if (isLoadingDocuments && !hasDocuments) { content = ( } @@ -146,7 +148,7 @@ export function PreviewPanel() { })} /> ); - } else if (isCreatingNewRule && documentsError) { + } else if (documentsError) { content = ( } @@ -162,7 +164,7 @@ export function PreviewPanel() { body={documentsError.message} /> ); - } else if (isCreatingNewRule && !hasDocuments) { + } else if (!hasDocuments) { content = ( } @@ -176,18 +178,18 @@ export function PreviewPanel() { } /> ); - } else if (isCreatingNewRule && hasDocuments) { + } else if (hasDocuments) { content = ( - + ); @@ -195,22 +197,8 @@ export function PreviewPanel() { return ( <> - - {isLoadingDocuments && } - - - - - {i18n.translate('xpack.streams.streamDetail.preview.header', { - defaultMessage: 'Data Preview', - })} - - - - - - - {content} + {isUpdating && } + {content} ); -} +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/routing_samples_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/routing_samples_state_machine.ts new file mode 100644 index 0000000000000..381dbedc0b22a --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/routing_samples_state_machine.ts @@ -0,0 +1,507 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + setup, + assign, + ActorRefFrom, + fromObservable, + fromEventObservable, + MachineImplementationsFrom, + SnapshotFrom, +} from 'xstate5'; +import { Observable, filter, map, switchMap, timeout, catchError, throwError } from 'rxjs'; +import { isRunningResponse } from '@kbn/data-plugin/common'; +import { + Condition, + SampleDocument, + Streams, + conditionToQueryDsl, + getConditionFields, + isAlwaysCondition, +} from '@kbn/streams-schema'; +import { isEmpty, isNumber } from 'lodash'; +import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { getPlaceholderFor } from '@kbn/xstate-utils'; +import { TimefilterHook } from '@kbn/data-plugin/public/query/timefilter/use_timefilter'; +import { i18n } from '@kbn/i18n'; +import { MappingRuntimeFields } from '@elastic/elasticsearch/lib/api/types'; +import { getPercentageFormatter } from '../../../../../util/formatters'; +import { emptyEqualsToAlways } from '../../../../../util/condition'; + +export interface RoutingSamplesMachineDeps { + data: DataPublicPluginStart; + timeState$: TimefilterHook['timeState$']; +} + +export type RoutingSamplesActorRef = ActorRefFrom; +export type RoutingSamplesActorSnapshot = SnapshotFrom; + +export interface RoutingSamplesInput { + condition?: Condition; + definition: Streams.WiredStream.GetResponse; +} + +export interface RoutingSamplesContext { + condition?: Condition; + definition: Streams.WiredStream.GetResponse; + documents: SampleDocument[]; + documentsError?: Error; + approximateMatchingPercentage?: string; + approximateMatchingPercentageError?: Error; +} + +export type RoutingSamplesEvent = + | { type: 'routingSamples.refresh' } + | { type: 'routingSamples.updateCondition'; condition?: Condition }; + +export interface SearchParams extends RoutingSamplesInput { + start: number; + end: number; +} + +export interface CollectorParams { + data: DataPublicPluginStart; + input: RoutingSamplesInput; +} + +const SAMPLES_SIZE = 100; +const PROBABILITY_THRESHOLD = 100000; +const SEARCH_TIMEOUT_MS = 10000; // 10 seconds + +export const routingSamplesMachine = setup({ + types: { + input: {} as RoutingSamplesInput, + context: {} as RoutingSamplesContext, + events: {} as RoutingSamplesEvent, + }, + actors: { + collectDocuments: getPlaceholderFor(createDocumentsCollectorActor), + collectDocumentsCount: getPlaceholderFor(createDocumentsCountCollectorActor), + subscribeTimeUpdates: getPlaceholderFor(createTimeUpdatesActor), + }, + actions: { + updateCondition: assign((_, params: { condition?: Condition }) => ({ + condition: params.condition, + })), + storeDocuments: assign((_, params: { documents: SampleDocument[] }) => ({ + documents: params.documents, + })), + storeDocumentsError: assign((_, params: { error: Error | undefined }) => ({ + documentsError: params.error, + })), + storeDocumentCounts: assign((_, params: { count: string }) => ({ + approximateMatchingPercentage: params.count, + approximateMatchingPercentageError: undefined, + })), + storeDocumentCountsError: assign((_, params: { error: Error }) => ({ + approximateMatchingPercentageError: params.error, + })), + }, + delays: { + conditionUpdateDebounceTime: 500, + }, + guards: { + isValidSnapshot: (_, params: { context?: SampleDocument[] | string }) => + params.context !== undefined, + }, +}).createMachine({ + /** @xstate-layout N4IgpgJg5mDOIC5QCcD2BXALgSwHZQGUBDAWwAcAbOAYjSz0NMrgDpkwAzd2ACwG0ADAF1EoMqljYcqXKJAAPRADYAjCwDsAJiUCBSgJwqAHAGYVAViXqlAGhABPRABZLLc9aUmr6rSpMBffzs6HHxicipYWgxQxgjWdDIIIkwwAGEZCClsGUERJBBxSWlZAsUEL3MWIxUVJ2N9ARV9cxVbB2cTExYdXU16vyNzIydA4JiGcOZYFggwACMMXABjBgzcLJLqeVhMFLAWIg5U5AAKZczsmQBVJP2AEQWl5bAAFWwSMABKaPowpkisye6BWa0uJTyciKV1KoHKJnUVTq6hMTicAjM6IEljsjgQnm6mi0NRM+nq5j0+jGIBCkwBrA4YEwyx4DFmqGW6E+uEwMwoqCIWXw1AgMgOeAAbqgANYHC4UKjLTD3DlcsA82CQgrQkpycrmExGFhOKxGAyIymafS4xD6O0sO0Y7HqCwYlTU2n-eIzRnM1n4dmc7m8lj8wUMahgZBoZAsSgpDioZAkFjyxXK1XBzXCKESGF6xAG-QsPzuQxE-RKE02hBGdRG8zmJx2pxGTReTzqD0TL3TFi+lls0VB9WYDIgkNhoVQEViliSmVy1AKsBKlUjjXjnlasR53VlQtoh3qASNJ31EbWjoIPwqdRuUwmPpafp+bt-OJ9gf+qCBtU8rdJwFadI2jJM4woBMkxTNNVwzDdeUAndCj3HJYQUQ8qk8Iw22MLo-HUK88RUTQBE0Y1zErAR1CcDxdHMQIghAXBUDmeACk9T9IlzYo0ILBAsMNUxhhwusTRUGtNCMAQS10AR0TvTRtBcd9YimQE5kWEFVnwdZNj47VUJkfj0WLZsVGomjaKaPQa0sGS2jLEZsUbRtVLpb1+yZQd8B4-MDwE7psOE0SxNUGsT26RtzOaC9K00dze0Bb8h0zUd2N3XjjICizyJRFp9FMQihn0JSIsMFhsWbA1DSJQiqSYzj1IZbyfz-LNQ2Ahg-P3OFEFdDRSUoorCsosrryMfR70aVtRpafCGMansuJav1UoQmZRVwMAeoMvrayaB1SMRA1NAsk0TBrSajXrCxrFPWp9BMBKlo-ZqfVa9b-zHJZeV27L9ostRgpMEScJo8Lr1qDFjS8JSDWk9xnsSlaPrWgNh2+wC+S63zDKy9DyjqYsQbBsKa0emSntUIlaZcFoUferz0d-THg2x9ltv+wnEGk7oyIsO8alIswJKhs7yOpto2wNJGXsCIA */ + id: 'routingSamples', + context: ({ input }) => ({ + condition: input.condition, + definition: input.definition, + approximateMatchingPercentage: undefined, + documents: [], + documentsError: undefined, + approximateMatchingPercentageError: undefined, + }), + initial: 'fetching', + invoke: { + id: 'subscribeTimeUpdatesActor', + src: 'subscribeTimeUpdates', + }, + on: { + 'routingSamples.refresh': { + target: '.fetching', + reenter: true, + }, + 'routingSamples.updateCondition': { + target: '.debouncingCondition', + reenter: true, + actions: [{ type: 'updateCondition', params: ({ event }) => event }], + }, + }, + states: { + debouncingCondition: { + after: { + conditionUpdateDebounceTime: 'fetching', + }, + }, + fetching: { + type: 'parallel', + states: { + documents: { + initial: 'loading', + states: { + loading: { + entry: [{ type: 'storeDocumentsError', params: { error: undefined } }], + invoke: { + id: 'collectDocuments', + src: 'collectDocuments', + input: ({ context }) => ({ + condition: context.condition, + definition: context.definition, + }), + onSnapshot: { + guard: { + type: 'isValidSnapshot', + params: ({ event }) => ({ context: event.snapshot.context }), + }, + actions: [ + { + type: 'storeDocuments', + params: ({ event }) => ({ documents: event.snapshot.context ?? [] }), + }, + ], + }, + onDone: { + target: 'done', + }, + onError: { + target: 'done', + actions: [ + { + type: 'storeDocuments', + params: { documents: [] as SampleDocument[] }, + }, + { + type: 'storeDocumentsError', + params: ({ event }) => ({ error: event.error as Error }), + }, + ], + }, + }, + }, + done: { + type: 'final', + }, + }, + }, + documentCounts: { + initial: 'loading', + states: { + loading: { + invoke: { + id: 'collectDocumentsCount', + src: 'collectDocumentsCount', + input: ({ context }) => ({ + condition: context.condition, + definition: context.definition, + }), + onSnapshot: { + guard: { + type: 'isValidSnapshot', + params: ({ event }) => ({ context: event.snapshot.context }), + }, + actions: [ + { + type: 'storeDocumentCounts', + params: ({ event }) => ({ count: event.snapshot.context ?? '' }), + }, + ], + }, + onDone: { + target: 'done', + }, + onError: { + target: 'done', + actions: [ + { + type: 'storeDocumentCountsError', + params: ({ event }) => ({ error: event.error as Error }), + }, + ], + }, + }, + }, + done: { + type: 'final', + }, + }, + }, + }, + }, + }, +}); + +export const createRoutingSamplesMachineImplementations = ({ + data, + timeState$, +}: RoutingSamplesMachineDeps): MachineImplementationsFrom => ({ + actors: { + collectDocuments: createDocumentsCollectorActor({ data }), + collectDocumentsCount: createDocumentsCountCollectorActor({ data }), + subscribeTimeUpdates: createTimeUpdatesActor({ timeState$ }), + }, +}); + +export function createDocumentsCollectorActor({ data }: Pick) { + return fromObservable>( + ({ input }) => { + return collectDocuments({ data, input }); + } + ); +} + +export function createDocumentsCountCollectorActor({ + data, +}: Pick) { + return fromObservable>( + ({ input }) => { + return collectDocumentCounts({ data, input }); + } + ); +} + +function createTimeUpdatesActor({ timeState$ }: Pick) { + return fromEventObservable(() => + timeState$.pipe(map(() => ({ type: 'routingSamples.refresh' }))) + ); +} + +function collectDocuments({ data, input }: CollectorParams): Observable { + const abortController = new AbortController(); + + const { start, end } = getAbsoluteTimestamps(data); + const params = buildDocumentsSearchParams({ ...input, start, end }); + + return new Observable((observer) => { + const subscription = data.search + .search({ params }, { abortSignal: abortController.signal, retrieveResults: true }) + .pipe( + filter((result) => !isRunningResponse(result) || !isEmpty(result.rawResponse.hits.hits)), + timeout(SEARCH_TIMEOUT_MS), + map((result) => result.rawResponse.hits.hits.map((hit) => hit._source)), + catchError(handleTimeoutError) + ) + .subscribe(observer); + + return () => { + abortController.abort(); + subscription.unsubscribe(); + }; + }); +} + +const percentageFormatter = getPercentageFormatter({ precision: 2 }); + +function collectDocumentCounts({ data, input }: CollectorParams): Observable { + const abortController = new AbortController(); + + const { start, end } = getAbsoluteTimestamps(data); + const searchParams = { ...input, start, end }; + const params = buildDocumentCountSearchParams(searchParams); + + return new Observable((observer) => { + const subscription = data.search + .search({ params }, { abortSignal: abortController.signal }) + .pipe( + filter((result) => !isRunningResponse(result)), + timeout(SEARCH_TIMEOUT_MS), + switchMap((countResult) => { + const docCount = + !countResult.rawResponse.hits.total || isNumber(countResult.rawResponse.hits.total) + ? countResult.rawResponse.hits.total + : countResult.rawResponse.hits.total.value; + + return data.search + .search( + { + params: buildDocumentCountProbabilitySearchParams({ + ...searchParams, + docCount, + }), + }, + { abortSignal: abortController.signal } + ) + .pipe( + filter((result) => !isRunningResponse(result)), + timeout(SEARCH_TIMEOUT_MS), + map((result) => { + // Aggregations don't return partial results so we just wait until the end + if (result.rawResponse.aggregations) { + // We need to divide this by the sampling / probability factor: + // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-random-sampler-aggregation.html#random-sampler-special-cases + const sampleAgg = result.rawResponse.aggregations.sample as { + doc_count: number; + probability: number; + matching_docs: { doc_count: number }; + }; + const randomSampleDocCount = sampleAgg.doc_count / sampleAgg.probability; + const matchingDocCount = sampleAgg.matching_docs.doc_count; + return percentageFormatter.format(matchingDocCount / randomSampleDocCount); + } + return undefined; + }), + catchError(handleTimeoutError) + ); + }), + catchError(handleTimeoutError) + ) + .subscribe(observer); + + return () => { + abortController.abort(); + subscription.unsubscribe(); + }; + }); +} + +const createTimestampRangeQuery = (start: number, end: number) => ({ + range: { + '@timestamp': { + gte: start, + lte: end, + format: 'epoch_millis', + }, + }, +}); + +const getAbsoluteTimestamps = (data: DataPublicPluginStart) => { + const time = data.query.timefilter.timefilter.getAbsoluteTime(); + + return { + start: new Date(time.from).getTime(), + end: new Date(time.to).getTime(), + }; +}; + +/** + * Create runtime mappings for fields that aren't mapped. + * Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as. + * Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during + * ingest in the painless condition checks. + */ +function getRuntimeMappings( + definition: Streams.WiredStream.GetResponse, + condition?: Condition +): MappingRuntimeFields { + if (!condition) return {}; + + const mappedFields = Object.keys({ + ...definition.inherited_fields, + ...definition.stream.ingest.wired.fields, + }); + + return Object.fromEntries( + getConditionFields(condition) + .filter((field) => !mappedFields.includes(field.name)) + .map((field) => [field.name, { type: field.type === 'string' ? 'keyword' : 'double' }]) + ); +} + +function processCondition(condition?: Condition): Condition | undefined { + if (!condition) return undefined; + const convertedCondition = emptyEqualsToAlways(condition); + return convertedCondition && isAlwaysCondition(convertedCondition) + ? undefined + : convertedCondition; +} + +function handleTimeoutError(error: Error) { + if (error.name === 'TimeoutError') { + return throwError( + () => + new Error( + i18n.translate('xpack.streams.routingSamples.documentsSearchTimeoutErrorMessage', { + defaultMessage: + 'Documents search timed out after 10 seconds. Refresh the preview or try simplifying the routing condition.', + }) + ) + ); + } + return throwError(() => error); +} + +function buildDocumentsSearchParams({ condition, start, end, definition }: SearchParams) { + const finalCondition = processCondition(condition); + const runtimeMappings = getRuntimeMappings(definition, finalCondition); + + return { + index: definition.stream.name, + query: { + bool: { + must: [ + finalCondition ? conditionToQueryDsl(finalCondition) : { match_all: {} }, + createTimestampRangeQuery(start, end), + ], + }, + }, + runtime_mappings: runtimeMappings, + size: SAMPLES_SIZE, + sort: [{ '@timestamp': { order: 'desc' } }], + terminate_after: SAMPLES_SIZE, + track_total_hits: false, + allow_partial_search_results: true, + }; +} + +function buildDocumentCountSearchParams({ start, end, definition }: SearchParams) { + return { + index: definition.stream.name, + query: createTimestampRangeQuery(start, end), + size: 0, + track_total_hits: true, + }; +} + +function buildDocumentCountProbabilitySearchParams({ + condition, + definition, + docCount, + end, + start, +}: SearchParams & { docCount?: number }) { + const finalCondition = processCondition(condition); + const runtimeMappings = getRuntimeMappings(definition, finalCondition); + const query = finalCondition ? conditionToQueryDsl(finalCondition) : { match_all: {} }; + const probability = calculateProbability(docCount); + + return { + index: definition.stream.name, + query: createTimestampRangeQuery(start, end), + aggs: { + sample: { + random_sampler: { + probability, + }, + aggs: { + matching_docs: { + filter: query, + }, + }, + }, + }, + runtime_mappings: runtimeMappings, + size: 0, + _source: false, + track_total_hits: false, + }; +} + +/** + * Calculates sampling probability based on document count + */ +function calculateProbability(docCount?: number): number { + if (!docCount || docCount <= PROBABILITY_THRESHOLD) { + return 1; + } + const probability = PROBABILITY_THRESHOLD / docCount; + // Values between 0.5 and 1 are not supported by the random sampler + return probability <= 0.5 ? probability : 1; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/stream_routing_state_machine.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/stream_routing_state_machine.ts index 5a8951897fd52..8195894bffbeb 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/stream_routing_state_machine.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/stream_routing_state_machine.ts @@ -4,7 +4,14 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import { MachineImplementationsFrom, assign, and, setup, ActorRefFrom } from 'xstate5'; +import { + MachineImplementationsFrom, + assign, + and, + enqueueActions, + setup, + ActorRefFrom, +} from 'xstate5'; import { getPlaceholderFor } from '@kbn/xstate-utils'; import { Streams, isSchema, routingDefinitionListSchema } from '@kbn/streams-schema'; import { ALWAYS_CONDITION } from '../../../../../util/condition'; @@ -24,6 +31,10 @@ import { import { routingConverter } from '../../utils'; import { RoutingDefinitionWithUIAttributes } from '../../types'; import { selectCurrentRule } from './selectors'; +import { + createRoutingSamplesMachineImplementations, + routingSamplesMachine, +} from './routing_samples_state_machine'; export type StreamRoutingActorRef = ActorRefFrom; @@ -37,6 +48,7 @@ export const streamRoutingMachine = setup({ deleteStream: getPlaceholderFor(createDeleteStreamActor), forkStream: getPlaceholderFor(createForkStreamActor), upsertStream: getPlaceholderFor(createUpsertStreamActor), + routingSamplesMachine: getPlaceholderFor(() => routingSamplesMachine), }, actions: { notifyStreamSuccess: getPlaceholderFor(createStreamSuccessNofitier), @@ -149,6 +161,14 @@ export const streamRoutingMachine = setup({ entry: [{ type: 'addNewRoutingRule' }], exit: [{ type: 'resetRoutingChanges' }], initial: 'changing', + invoke: { + id: 'routingSamplesMachine', + src: 'routingSamplesMachine', + input: ({ context }) => ({ + definition: context.definition, + condition: selectCurrentRule(context).if, + }), + }, states: { changing: { on: { @@ -157,7 +177,17 @@ export const streamRoutingMachine = setup({ actions: [{ type: 'resetRoutingChanges' }], }, 'routingRule.change': { - actions: [{ type: 'patchRule', params: ({ event }) => event }], + actions: enqueueActions(({ enqueue, event }) => { + enqueue({ type: 'patchRule', params: { routingRule: event.routingRule } }); + + // Trigger samples collection only on condition change + if (event.routingRule.if) { + enqueue.sendTo('routingSamplesMachine', { + type: 'routingSamples.updateCondition', + condition: event.routingRule.if, + }); + } + }), }, 'routingRule.edit': { guard: 'hasManagePrivileges', @@ -317,12 +347,20 @@ export const createStreamRoutingMachineImplementations = ({ refreshDefinition, streamsRepositoryClient, core, + data, + timeState$, forkSuccessNofitier, }: StreamRoutingServiceDependencies): MachineImplementationsFrom => ({ actors: { deleteStream: createDeleteStreamActor({ streamsRepositoryClient }), forkStream: createForkStreamActor({ streamsRepositoryClient, forkSuccessNofitier }), upsertStream: createUpsertStreamActor({ streamsRepositoryClient }), + routingSamplesMachine: routingSamplesMachine.provide( + createRoutingSamplesMachineImplementations({ + data, + timeState$, + }) + ), }, actions: { refreshDefinition, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/types.ts index f46bdfcdc31c6..7b32c2eaeac3b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/types.ts @@ -9,12 +9,14 @@ import { CoreStart } from '@kbn/core/public'; import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import { Streams } from '@kbn/streams-schema'; import { DataPublicPluginStart } from '@kbn/data-plugin/public'; +import { TimefilterHook } from '@kbn/data-plugin/public/query/timefilter/use_timefilter'; import { RoutingDefinitionWithUIAttributes } from '../../types'; export interface StreamRoutingServiceDependencies { forkSuccessNofitier: (streamName: string) => void; refreshDefinition: () => void; streamsRepositoryClient: StreamsRepositoryClient; + timeState$: TimefilterHook['timeState$']; core: CoreStart; data: DataPublicPluginStart; } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/use_stream_routing.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/use_stream_routing.tsx index 14f0bf021bdba..5f2202b3293b4 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/use_stream_routing.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/stream_detail_routing/state_management/stream_routing_state_machine/use_stream_routing.tsx @@ -6,7 +6,7 @@ */ import React, { useEffect, useMemo } from 'react'; -import { createActorContext } from '@xstate5/react'; +import { createActorContext, useSelector } from '@xstate5/react'; import { createConsoleInspector } from '@kbn/xstate-utils'; import { waitFor } from 'xstate5'; import { @@ -15,6 +15,10 @@ import { } from './stream_routing_state_machine'; import { StreamRoutingInput, StreamRoutingServiceDependencies } from './types'; import { RoutingDefinitionWithUIAttributes } from '../../types'; +import { + RoutingSamplesActorRef, + RoutingSamplesActorSnapshot, +} from './routing_samples_state_machine'; const consoleInspector = createConsoleInspector(); @@ -92,3 +96,23 @@ const ListenForDefinitionChanges = ({ return children; }; + +export const useStreamSamplesRef = () => { + return useStreamsRoutingSelector( + (state) => state.children.routingSamplesMachine as RoutingSamplesActorRef | undefined + ); +}; + +export const useStreamSamplesSelector = ( + selector: (snapshot: RoutingSamplesActorSnapshot) => T +): T => { + const routingSamplesRef = useStreamSamplesRef(); + + if (!routingSamplesRef) { + throw new Error( + 'useStreamSamplesSelector must be used within a StreamEnrichmentContextProvider' + ); + } + + return useSelector(routingSamplesRef, selector); +}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/hooks/queries/use_async_sample.tsx b/x-pack/platform/plugins/shared/streams_app/public/hooks/queries/use_async_sample.tsx deleted file mode 100644 index d25efd3bdaca7..0000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/hooks/queries/use_async_sample.tsx +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { useEffect, useMemo, useState, useCallback } from 'react'; -import { - Condition, - SampleDocument, - Streams, - conditionToQueryDsl, - getConditionFields, - isAlwaysCondition, -} from '@kbn/streams-schema'; -import useToggle from 'react-use/lib/useToggle'; -import { MappingRuntimeField, MappingRuntimeFields } from '@elastic/elasticsearch/lib/api/types'; -import { filter, switchMap } from 'rxjs'; -import { isRunningResponse } from '@kbn/data-plugin/common'; -import { useAbortController } from '@kbn/react-hooks'; -import { isEmpty } from 'lodash'; -import { useKibana } from '../use_kibana'; -import { emptyEqualsToAlways } from '../../util/condition'; - -interface Options { - condition?: Condition; - start: number; - end: number; - size?: number; - streamDefinition: Streams.WiredStream.GetResponse; -} - -export const useAsyncSample = (options: Options) => { - const { - dependencies: { - start: { data }, - }, - } = useKibana(); - - const controller = useAbortController(); - - // Documents - const [isLoadingDocuments, toggleIsLoadingDocuments] = useToggle(false); - const [documentsError, setDocumentsError] = useState(); - const [documents, setDocuments] = useState([]); - - // Document counts / percentage - const [isLoadingDocumentCounts, toggleIsLoadingDocumentCounts] = useToggle(false); - const [documentCountsError, setDocumentCountsError] = useState(); - const [approximateMatchingPercentage, setApproximateMatchingPercentage] = useState< - string | undefined - >(); - - const [refreshId, setRefreshId] = useState(0); - - const convertedCondition = useMemo(() => { - const condition = options.condition ? emptyEqualsToAlways(options.condition) : undefined; - return condition && isAlwaysCondition(condition) ? undefined : condition; - }, [options.condition]); - - const refresh = useCallback(() => { - return setRefreshId((id) => id + 1); - }, []); - - useEffect(() => { - if (!options.start || !options.end) { - setDocuments([]); - setApproximateMatchingPercentage(undefined); - return; - } - - const runtimeMappings = getRuntimeMappings(options.streamDefinition, convertedCondition); - - // Documents - toggleIsLoadingDocuments(true); - const documentSubscription = data.search - .search( - { - params: { - index: options.streamDefinition.stream.name, - body: getDocumentsSearchBody(options, runtimeMappings, convertedCondition), - }, - }, - { abortSignal: controller.signal } - ) - .subscribe({ - next: (result) => { - if (!isRunningResponse(result)) { - toggleIsLoadingDocuments(false); - if (result.rawResponse.hits?.hits) { - setDocuments(result.rawResponse.hits.hits.map((hit) => hit._source)); - } - return; - } else if (!isEmpty(result.rawResponse.hits?.hits)) { - setDocuments(result.rawResponse.hits.hits.map((hit) => hit._source)); - } - }, - error: (e) => { - setDocumentsError(e); - toggleIsLoadingDocuments(false); - }, - }); - - toggleIsLoadingDocumentCounts(true); - setApproximateMatchingPercentage(undefined); - const documentCountsSubscription = data.search - .search( - { - params: { - index: options.streamDefinition.stream.name, - body: getDocumentCountForSampleRateSearchBody(options), - }, - }, - { abortSignal: controller.signal } - ) - .pipe( - filter((result) => !isRunningResponse(result)), - switchMap((response) => { - const docCount = - response.rawResponse.hits.total && - typeof response.rawResponse.hits.total !== 'number' && - 'value' in response.rawResponse.hits.total - ? response.rawResponse.hits.total.value - : response.rawResponse.hits.total; - - const probability = calculateProbability(docCount); - - return data.search.search( - { - params: { - index: options.streamDefinition.stream.name, - body: getDocumentCountsSearchBody( - options, - runtimeMappings, - probability, - convertedCondition - ), - }, - }, - { abortSignal: controller.signal } - ); - }) - ) - .subscribe({ - next: (result) => { - if (!isRunningResponse(result)) { - toggleIsLoadingDocumentCounts(false); - } - // Aggregations don't return partial results so we just wait until the end - if (result.rawResponse?.aggregations) { - // We need to divide this by the sampling / probability factor: - // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-random-sampler-aggregation.html#random-sampler-special-cases - const sampleAgg = result.rawResponse.aggregations.sample as { - doc_count: number; - probability: number; - matching_docs: { doc_count: number }; - }; - const randomSampleDocCount = sampleAgg.doc_count / sampleAgg.probability; - - const matchingDocCount = sampleAgg.matching_docs.doc_count; - - const percentage = (100 * matchingDocCount) / randomSampleDocCount; - - setApproximateMatchingPercentage(percentage.toFixed(2)); - } - }, - error: (e) => { - toggleIsLoadingDocumentCounts(false); - setDocumentCountsError(e); - }, - }); - - return () => { - documentSubscription.unsubscribe(); - documentCountsSubscription.unsubscribe(); - }; - // eslint-disable-next-line react-hooks/exhaustive-deps - }, [ - data.search, - convertedCondition, - options.start, - options.end, - options.size, - options.streamDefinition, - refreshId, - ]); - - return { - isLoadingDocuments, - documentsError, - documents, - isLoadingDocumentCounts, - documentCountsError, - approximateMatchingPercentage, - refresh, - }; -}; - -export type AsyncSample = ReturnType; - -// Create runtime mappings for fields that aren't mapped. -// Conditions could be using fields which are not indexed or they could use it with other types than they are eventually mapped as. -// Because of this we can't rely on mapped fields to draw a sample, instead we need to use runtime fields to simulate what happens during -// ingest in the painless condition checks. -const getRuntimeMappings = ( - streamDefinition: Streams.WiredStream.GetResponse, - condition?: Condition -) => { - if (!condition) return {}; - - const wiredMappedFields = - 'wired' in streamDefinition.stream.ingest ? streamDefinition.stream.ingest.wired.fields : {}; - const mappedFields = Object.keys(wiredMappedFields).concat( - Object.keys(streamDefinition.inherited_fields) - ); - - return Object.fromEntries( - getConditionFields(condition) - .filter((field) => !mappedFields.includes(field.name)) - .map((field) => [ - field.name, - { type: field.type === 'string' ? 'keyword' : 'double' } as MappingRuntimeField, - ]) - ); -}; - -const getDocumentsSearchBody = ( - options: Options, - runtimeMappings: MappingRuntimeFields, - condition?: Condition -) => { - const { size, start, end } = options; - - const searchBody = { - query: { - bool: { - must: [ - condition ? conditionToQueryDsl(condition) : { match_all: {} }, - { - range: { - '@timestamp': { - gte: start, - lte: end, - format: 'epoch_millis', - }, - }, - }, - ], - }, - }, - runtime_mappings: runtimeMappings, - sort: [ - { - '@timestamp': { - order: 'desc', - }, - }, - ], - terminate_after: size, - track_total_hits: false, - size, - }; - return searchBody; -}; - -const getDocumentCountForSampleRateSearchBody = (options: Options) => { - const { start, end } = options; - - const searchBody = { - query: { - range: { - '@timestamp': { - gte: start, - lte: end, - format: 'epoch_millis', - }, - }, - }, - track_total_hits: true, - size: 0, - }; - return searchBody; -}; - -const getDocumentCountsSearchBody = ( - options: Options, - runtimeMappings: MappingRuntimeFields, - probability: number, - condition?: Condition -) => { - const { start, end } = options; - - const searchBody = { - query: { - bool: { - must: [ - { - range: { - '@timestamp': { - gte: start, - lte: end, - format: 'epoch_millis', - }, - }, - }, - ], - }, - }, - aggs: { - sample: { - random_sampler: { - probability, - }, - aggs: { - matching_docs: { - filter: condition ? conditionToQueryDsl(condition) : { match_all: {} }, - }, - }, - }, - }, - runtime_mappings: runtimeMappings, - size: 0, - _source: false, - track_total_hits: false, - }; - return searchBody; -}; - -const calculateProbability = (docCount?: number) => { - if (!docCount) return 1; - const probabilityThreshold = 100000; - if (docCount > probabilityThreshold) { - const probability = probabilityThreshold / docCount; - // Values between 0.5 and 1 are not supported by the random sampler - return probability <= 0.5 ? probability : 1; - } else { - return 1; - } -}; diff --git a/x-pack/platform/plugins/shared/streams_app/public/util/formatters.ts b/x-pack/platform/plugins/shared/streams_app/public/util/formatters.ts new file mode 100644 index 0000000000000..5e375f42497a8 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/util/formatters.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { i18n } from '@kbn/i18n'; + +export function getPercentageFormatter(opts?: { precision: number }): Intl.NumberFormat { + return new Intl.NumberFormat(i18n.getLocale(), { + style: 'percent', + maximumFractionDigits: opts?.precision ?? 1, + }); +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/util/use_debounce.ts b/x-pack/platform/plugins/shared/streams_app/public/util/use_debounce.ts deleted file mode 100644 index eee9accd2e29e..0000000000000 --- a/x-pack/platform/plugins/shared/streams_app/public/util/use_debounce.ts +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import useDebounce from 'react-use/lib/useDebounce'; -import { useState } from 'react'; - -export function useDebounced(value: T, debounceDelay: number = 300) { - const [debouncedValue, setValue] = useState(value); - - useDebounce( - () => { - setValue(value); - }, - debounceDelay, - [value, setValue] - ); - - return debouncedValue; -}