Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

export { Streams } from './src/models/streams';
export { IngestBase } from './src/models/ingest/base';
export { IngestBase, type IngestStreamIndexMode } from './src/models/ingest/base';
export { Ingest } from './src/models/ingest';
export { WiredIngest } from './src/models/ingest/wired';
export { ClassicIngest } from './src/models/ingest/classic';
Expand All @@ -24,6 +24,7 @@ export type { StreamType } from './src/helpers/get_stream_type_from_definition';
export { isRootStreamDefinition } from './src/helpers/is_root';
export { isOtelStream } from './src/helpers/is_otel_stream';
export { getIndexPatternsForStream } from './src/helpers/hierarchy_helpers';
export { getDiscoverEsqlQuery } from './src/helpers/get_discover_esql_query';
export {
convertUpsertRequestIntoDefinition,
convertGetResponseIntoUpsertRequest,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import type { IngestStreamIndexMode } from '../models/ingest/base';
import type { Streams } from '../models/streams';
import { getIndexPatternsForStream } from './hierarchy_helpers';

export interface GetDiscoverEsqlQueryOptions {
/**
* The stream definition to generate the query for
*/
definition: Streams.all.Definition;
/**
* The index mode of the stream (from API response)
*/
indexMode?: IngestStreamIndexMode;
/**
* Whether to include METADATA _source (typically for wired streams)
*/
includeMetadata?: boolean;
}

/**
* Generates a base ES|QL query for Discover from a stream definition.
*
* Uses 'TS' source command for TSDB mode streams, 'FROM' otherwise.
* Optionally includes METADATA _source for wired streams.
*
* @param options - Configuration options for query generation
* @returns The ES|QL query string, or undefined if index patterns cannot be determined
*
* @example
* // Basic usage
* getDiscoverEsqlQuery({ definition, indexMode })
* // Returns: "FROM logs,logs.*"
*
* @example
* // With TSDB mode
* getDiscoverEsqlQuery({ definition, indexMode: 'time_series' })
* // Returns: "TS logs,logs.*"
*
* @example
* // With metadata for wired streams
* getDiscoverEsqlQuery({ definition, indexMode, includeMetadata: true })
* // Returns: "FROM logs,logs.* METADATA _source"
*/
export function getDiscoverEsqlQuery(options: GetDiscoverEsqlQueryOptions): string | undefined {
const { definition, indexMode, includeMetadata = false } = options;

const indexPatterns = getIndexPatternsForStream(definition);
if (!indexPatterns) {
return undefined;
}

const sourceCommand = indexMode === 'time_series' ? 'TS' : 'FROM';
const metadataSuffix = includeMetadata ? ' METADATA _source' : '';

return `${sourceCommand} ${indexPatterns.join(', ')}${metadataSuffix}`;
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type IngestBaseStreamDefaults = {
} & ModelOfSchema<IIngestBaseStreamSchema>;

/* eslint-disable @typescript-eslint/no-namespace */
export type IngestStreamIndexMode = 'standard' | 'time_series' | 'logsdb' | 'lookup';

export namespace IngestBaseStream {
export interface Definition extends BaseStream.Definition {
ingest: IngestBase;
Expand All @@ -127,6 +129,7 @@ export namespace IngestBaseStream {
TDefinition extends IngestBaseStream.Definition = IngestBaseStream.Definition
> extends BaseStream.GetResponse<TDefinition> {
privileges: IngestStreamPrivileges;
index_mode?: IngestStreamIndexMode;
}

export type UpsertRequest<
Expand All @@ -141,13 +144,21 @@ export namespace IngestBaseStream {
}
}

const ingestStreamIndexModeSchema: z.Schema<IngestStreamIndexMode> = z.enum([
'standard',
'time_series',
'logsdb',
'lookup',
]);

const IngestBaseStreamSchema = {
Source: z.object({}),
Definition: z.object({
ingest: IngestBase.right,
}),
GetResponse: z.object({
privileges: ingestStreamPrivilegesSchema,
index_mode: z.optional(ingestStreamIndexModeSchema),
}),
UpsertRequest: z.object({}),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ export async function readStream({
return {
stream: streamDefinition,
privileges,
index_mode: dataStream?.index_mode,
elasticsearch_assets:
dataStream && privileges.manage
? await getUnmanagedElasticsearchAssets({
Expand Down Expand Up @@ -130,6 +131,7 @@ export async function readStream({
rules,
privileges,
queries,
index_mode: dataStream?.index_mode,
effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors),
effective_settings: getInheritedSettings([...ancestors, streamDefinition]),
inherited_fields: inheritedFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
isErrorLifecycle,
isDslLifecycle,
Streams,
getIndexPatternsForStream,
getDiscoverEsqlQuery,
} from '@kbn/streams-schema';
import React from 'react';
import type { DiscoverAppLocatorParams } from '@kbn/discover-plugin/common';
Expand Down Expand Up @@ -178,10 +178,11 @@ export function DiscoverBadgeButton({
} = useKibana();
const dataStreamExists =
Streams.WiredStream.GetResponse.is(definition) || definition.data_stream_exists;
const indexPatterns = getIndexPatternsForStream(definition.stream);
const esqlQuery = indexPatterns
? `FROM ${indexPatterns.join(', ')}${isWiredStream ? ' METADATA _source' : ''}`
: undefined;
const esqlQuery = getDiscoverEsqlQuery({
definition: definition.stream,
indexMode: definition.index_mode,
includeMetadata: isWiredStream,
});
const useUrl = share.url.locators.useUrl;

const discoverLink = useUrl<DiscoverAppLocatorParams>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,40 @@ import { type Streams, isDescendantOf } from '@kbn/streams-schema';

import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { AssetImage } from '../asset_image';
import { StreamsList } from '../streams_list';
import { useWiredStreams } from '../../hooks/use_wired_streams';
import { StreamsList, type StreamListItem } from '../streams_list';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';

export function ChildStreamList({ definition }: { definition?: Streams.ingest.all.GetResponse }) {
const router = useStreamsAppRouter();
const {
dependencies: {
start: {
streams: { streamsRepositoryClient },
},
},
} = useKibana();

const { wiredStreams } = useWiredStreams();
// Fetch from internal API to get data_stream info for TSDB mode detection
const { value: streamsResponse } = useStreamsAppFetch(
async ({ signal }) =>
streamsRepositoryClient.fetch('GET /internal/streams', {
signal,
}),
[streamsRepositoryClient]
);
Comment thread
tonyghiani marked this conversation as resolved.

const childrenStreams = useMemo(() => {
if (!definition) {
return [];
const childrenStreams = useMemo((): StreamListItem[] | undefined => {
if (!definition || !streamsResponse?.streams) {
return undefined;
}
return wiredStreams?.filter((d) => isDescendantOf(definition.stream.name, d.name));
}, [definition, wiredStreams]);
return streamsResponse.streams
.filter((item) => isDescendantOf(definition.stream.name, item.stream.name))
.map((item) => ({
stream: item.stream,
data_stream: item.data_stream,
}));
}, [definition, streamsResponse?.streams]);

if (definition && childrenStreams?.length === 0) {
return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { css } from '@emotion/css';
import { i18n } from '@kbn/i18n';
import React, { useMemo } from 'react';
import { Streams, getIndexPatternsForStream } from '@kbn/streams-schema';
import { Streams, getDiscoverEsqlQuery, getIndexPatternsForStream } from '@kbn/streams-schema';
import { computeInterval } from '@kbn/visualization-utils';
import type { DurationInputArg1, DurationInputArg2 } from 'moment';
import moment from 'moment';
Expand Down Expand Up @@ -55,19 +55,22 @@ export function StreamChartPanel({ definition }: StreamChartPanelProps) {
);

const queries = useMemo(() => {
if (!indexPatterns) {
const baseQuery = getDiscoverEsqlQuery({
definition: definition.stream,
indexMode: definition.index_mode,
});

if (!baseQuery) {
return undefined;
}

const baseQuery = `FROM ${indexPatterns.join(', ')}`;

const histogramQuery = `${baseQuery} | STATS metric = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${bucketSize})`;

return {
baseQuery,
histogramQuery,
};
}, [bucketSize, indexPatterns]);
}, [bucketSize, definition.stream, definition.index_mode]);

const discoverLink = useMemo(() => {
if (!discoverLocator || !queries?.baseQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
import React from 'react';
import { EuiButtonEmpty, EuiFlexGroup, EuiFlexItem, EuiTitle } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import type { Streams, System } from '@kbn/streams-schema';
import { getIndexPatternsForStream } from '@kbn/streams-schema';
import { Streams, type System, getDiscoverEsqlQuery } from '@kbn/streams-schema';
import { conditionToESQL } from '@kbn/streamlang';
import type { DiscoverAppLocatorParams } from '@kbn/discover-plugin/common';
import { DISCOVER_APP_LOCATOR } from '@kbn/discover-plugin/common';
import { useKibana } from '../../../hooks/use_kibana';
import { useStreamDetail } from '../../../hooks/use_stream_detail';
import { SystemEventsSparklineLast24hrs } from './system_events_sparkline';

export const SystemEventsData = ({
Expand All @@ -28,16 +28,24 @@ export const SystemEventsData = ({
start: { share },
},
} = useKibana();
// Get index_mode from the stream detail context (API response)
const { definition: fullDefinition } = useStreamDetail();
const useUrl = share.url.locators.useUrl;

const esqlQuery = `FROM ${getIndexPatternsForStream(definition).join(',')}
| WHERE ${conditionToESQL(system.filter)}`;
const indexMode = Streams.ingest.all.GetResponse.is(fullDefinition)
? fullDefinition.index_mode
: undefined;
const baseQuery = getDiscoverEsqlQuery({ definition, indexMode });
const esqlQuery = baseQuery
? `${baseQuery}
| WHERE ${conditionToESQL(system.filter)}`
: undefined;

const discoverLink = useUrl<DiscoverAppLocatorParams>(
() => ({
id: DISCOVER_APP_LOCATOR,
params: {
query: { esql: esqlQuery },
query: { esql: esqlQuery || '' },
timeRange: { from: 'now-24h', to: 'now' },
},
}),
Expand All @@ -57,7 +65,7 @@ export const SystemEventsData = ({
</EuiTitle>
</EuiFlexItem>
<EuiFlexItem grow={false}>
{discoverLink ? (
{discoverLink && esqlQuery ? (
<EuiButtonEmpty
size="s"
href={discoverLink}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ export function StreamsTreeTable({
{
stream: item.stream,
data_stream_exists: !!item.data_stream,
index_mode: item.data_stream?.index_mode,
} as Streams.ingest.all.GetResponse
}
isWiredStream={item.type === 'wired'}
Expand Down
Loading
Loading