Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

export { Streams } from './src/models/streams';
export { IngestBase } from './src/models/ingest/base';
export { IngestBase, type IngestStreamIndexMode } from './src/models/ingest/base';
export { Ingest } from './src/models/ingest';
export { WiredIngest } from './src/models/ingest/wired';
export { ClassicIngest } from './src/models/ingest/classic';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ type IngestBaseStreamDefaults = {
} & ModelOfSchema<IIngestBaseStreamSchema>;

/* eslint-disable @typescript-eslint/no-namespace */
export type IngestStreamIndexMode = 'standard' | 'time_series' | 'logsdb' | 'lookup';

export namespace IngestBaseStream {
export interface Definition extends BaseStream.Definition {
ingest: IngestBase;
Expand All @@ -127,6 +129,7 @@ export namespace IngestBaseStream {
TDefinition extends IngestBaseStream.Definition = IngestBaseStream.Definition
> extends BaseStream.GetResponse<TDefinition> {
privileges: IngestStreamPrivileges;
index_mode?: IngestStreamIndexMode;
}

export type UpsertRequest<
Expand All @@ -141,13 +144,21 @@ export namespace IngestBaseStream {
}
}

const ingestStreamIndexModeSchema: z.Schema<IngestStreamIndexMode> = z.enum([
'standard',
'time_series',
'logsdb',
'lookup',
]);

const IngestBaseStreamSchema = {
Source: z.object({}),
Definition: z.object({
ingest: IngestBase.right,
}),
GetResponse: z.object({
privileges: ingestStreamPrivilegesSchema,
index_mode: z.optional(ingestStreamIndexModeSchema),
}),
UpsertRequest: z.object({}),
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
getInheritedFieldsFromAncestors,
getInheritedSettings,
findInheritedFailureStore,
type IngestStreamIndexMode,
} from '@kbn/streams-schema';
import type { IScopedClusterClient } from '@kbn/core/server';
import { isNotFoundError } from '@kbn/es-errors';
Expand Down Expand Up @@ -91,6 +92,7 @@ export async function readStream({
return {
stream: streamDefinition,
privileges,
index_mode: dataStream?.index_mode as IngestStreamIndexMode | undefined,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why is this type assertion necessary? It shouldn't be harmful, but it's always nice to prevent assertions as you won't be protected if index_mode changes or is removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, we should change that.

elasticsearch_assets:
dataStream && privileges.manage
? await getUnmanagedElasticsearchAssets({
Expand Down Expand Up @@ -130,6 +132,7 @@ export async function readStream({
rules,
privileges,
queries,
index_mode: dataStream?.index_mode as IngestStreamIndexMode | undefined,
effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors),
effective_settings: getInheritedSettings([...ancestors, streamDefinition]),
inherited_fields: inheritedFields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,26 @@ export function LifecycleBadge({
export function DiscoverBadgeButton({
definition,
isWiredStream,
isTSDBMode: isTSDBModeProp,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot This prop is not used anywhere, please remove it

}: {
definition: Streams.ingest.all.GetResponse;
isWiredStream: boolean;
/** When provided from listing data, uses this instead of definition.index_mode */
isTSDBMode?: boolean;
}) {
const {
dependencies: {
start: { share },
},
} = useKibana();
// Use prop if provided (from listing data), otherwise use index_mode from definition (API response)
const isTSDBMode = isTSDBModeProp ?? definition.index_mode === 'time_series';
const dataStreamExists =
Streams.WiredStream.GetResponse.is(definition) || definition.data_stream_exists;
const indexPatterns = getIndexPatternsForStream(definition.stream);
const sourceCommand = isTSDBMode ? 'TS' : 'FROM';
const esqlQuery = indexPatterns
? `FROM ${indexPatterns.join(', ')}${isWiredStream ? ' METADATA _source' : ''}`
? `${sourceCommand} ${indexPatterns.join(', ')}${isWiredStream ? ' METADATA _source' : ''}`
: undefined;
const useUrl = share.url.locators.useUrl;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,40 @@ import { type Streams, isDescendantOf } from '@kbn/streams-schema';

import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { AssetImage } from '../asset_image';
import { StreamsList } from '../streams_list';
import { useWiredStreams } from '../../hooks/use_wired_streams';
import { StreamsList, type StreamListItem } from '../streams_list';
import { useKibana } from '../../hooks/use_kibana';
import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch';

export function ChildStreamList({ definition }: { definition?: Streams.ingest.all.GetResponse }) {
const router = useStreamsAppRouter();
const {
dependencies: {
start: {
streams: { streamsRepositoryClient },
},
},
} = useKibana();

const { wiredStreams } = useWiredStreams();
// Fetch from internal API to get data_stream info for TSDB mode detection
const { value: streamsResponse } = useStreamsAppFetch(
async ({ signal }) =>
streamsRepositoryClient.fetch('GET /internal/streams', {
signal,
}),
[streamsRepositoryClient]
);
Comment thread
tonyghiani marked this conversation as resolved.

const childrenStreams = useMemo(() => {
if (!definition) {
return [];
const childrenStreams = useMemo((): StreamListItem[] | undefined => {
if (!definition || !streamsResponse?.streams) {
return undefined;
}
return wiredStreams?.filter((d) => isDescendantOf(definition.stream.name, d.name));
}, [definition, wiredStreams]);
return streamsResponse.streams
.filter((item) => isDescendantOf(definition.stream.name, item.stream.name))
.map((item) => ({
stream: item.stream,
data_stream: item.data_stream,
}));
}, [definition, streamsResponse?.streams]);

if (definition && childrenStreams?.length === 0) {
return (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,24 @@ export function StreamChartPanel({ definition }: StreamChartPanelProps) {
[data, timeState.asAbsoluteTimeRange]
);

// Use index_mode from API response instead of expensive DataView check
const isTSDBMode = definition.index_mode === 'time_series';

const queries = useMemo(() => {
if (!indexPatterns) {
return undefined;
}

const baseQuery = `FROM ${indexPatterns.join(', ')}`;
const sourceCommand = isTSDBMode ? 'TS' : 'FROM';
const baseQuery = `${sourceCommand} ${indexPatterns.join(', ')}`;

const histogramQuery = `${baseQuery} | STATS metric = COUNT(*) BY @timestamp = BUCKET(@timestamp, ${bucketSize})`;

return {
baseQuery,
histogramQuery,
};
}, [bucketSize, indexPatterns]);
}, [bucketSize, indexPatterns, isTSDBMode]);

const discoverLink = useMemo(() => {
if (!discoverLocator || !queries?.baseQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { conditionToESQL } from '@kbn/streamlang';
import type { DiscoverAppLocatorParams } from '@kbn/discover-plugin/common';
import { DISCOVER_APP_LOCATOR } from '@kbn/discover-plugin/common';
import { useKibana } from '../../../hooks/use_kibana';
import { useStreamDetail } from '../../../hooks/use_stream_detail';
import { SystemEventsSparklineLast24hrs } from './system_events_sparkline';

export const SystemEventsData = ({
Expand All @@ -28,9 +29,15 @@ export const SystemEventsData = ({
start: { share },
},
} = useKibana();
// Get index_mode from the stream detail context (API response)
const { definition: fullDefinition } = useStreamDetail();
const isTSDBMode =
Streams.ingest.all.GetResponse.is(fullDefinition) &&
fullDefinition.index_mode === 'time_series';
const useUrl = share.url.locators.useUrl;

const esqlQuery = `FROM ${getIndexPatternsForStream(definition).join(',')}
const sourceCommand = isTSDBMode ? 'TS' : 'FROM';
const esqlQuery = `${sourceCommand} ${getIndexPatternsForStream(definition).join(',')}
| WHERE ${conditionToESQL(system.filter)}`;

const discoverLink = useUrl<DiscoverAppLocatorParams>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ export function StreamsTreeTable({
} as Streams.ingest.all.GetResponse
}
isWiredStream={item.type === 'wired'}
isTSDBMode={item.data_stream?.index_mode === 'time_series'}
/>
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,50 @@ import {
isDescendantOf,
isRootStreamDefinition,
} from '@kbn/streams-schema';
import type { estypes } from '@elastic/elasticsearch';
import { useStreamsAppRouter } from '../../hooks/use_streams_app_router';
import { NestedView } from '../nested_view';
import { useKibana } from '../../hooks/use_kibana';

export interface StreamListItem {
stream: Streams.all.Definition;
data_stream?: estypes.IndicesDataStream;
}

export interface StreamTree {
name: string;
type: 'wired' | 'root' | 'classic';
stream: Streams.all.Definition;
data_stream?: estypes.IndicesDataStream;
children: StreamTree[];
}

export function asTrees(streams: Streams.all.Definition[]) {
export function asTrees(items: StreamListItem[]) {
const trees: StreamTree[] = [];
const sortedStreams = streams
const sortedItems = items
.slice()
.sort((a, b) => getSegments(a.name).length - getSegments(b.name).length);
.sort((a, b) => getSegments(a.stream.name).length - getSegments(b.stream.name).length);

sortedStreams.forEach((stream) => {
sortedItems.forEach((item) => {
let currentTree = trees;
let existingNode: StreamTree | undefined;
// traverse the tree following the prefix of the current name.
// once we reach the leaf, the current name is added as child - this works because the ids are sorted by depth
while ((existingNode = currentTree.find((node) => isDescendantOf(node.name, stream.name)))) {
while (
(existingNode = currentTree.find((node) => isDescendantOf(node.name, item.stream.name)))
) {
currentTree = existingNode.children;
}

if (!existingNode) {
const newNode: StreamTree = {
name: stream.name,
name: item.stream.name,
children: [],
stream,
type: Streams.ClassicStream.Definition.is(stream)
stream: item.stream,
data_stream: item.data_stream,
type: Streams.ClassicStream.Definition.is(item.stream)
? 'classic'
: isRootStreamDefinition(stream)
: isRootStreamDefinition(item.stream)
? 'root'
: 'wired',
};
Expand All @@ -76,7 +86,7 @@ export function StreamsList({
query,
showControls,
}: {
streams: Streams.all.Definition[] | undefined;
streams: StreamListItem[] | undefined;
query?: string;
showControls: boolean;
}) {
Expand All @@ -88,8 +98,8 @@ export function StreamsList({

const filteredItems = useMemo(() => {
return items
.filter((item) => showClassic || Streams.WiredStream.Definition.is(item))
.filter((item) => !query || item.name.toLowerCase().includes(query.toLowerCase()));
.filter((item) => showClassic || Streams.WiredStream.Definition.is(item.stream))
.filter((item) => !query || item.stream.name.toLowerCase().includes(query.toLowerCase()));
}, [query, items, showClassic]);

const treeView = useMemo(() => asTrees(filteredItems), [filteredItems]);
Expand All @@ -113,7 +123,7 @@ export function StreamsList({
iconType="fold"
size="s"
onClick={() =>
setCollapsed(Object.fromEntries(items.map((item) => [item.name, true])))
setCollapsed(Object.fromEntries(items.map((item) => [item.stream.name, true])))
}
>
{i18n.translate('xpack.streams.streamsTable.collapseAll', {
Expand Down Expand Up @@ -178,19 +188,23 @@ function StreamNode({
[share.url.locators]
);

// Use TSDB mode from data_stream.index_mode directly from listing data
const isTSDBMode = node.data_stream?.index_mode === 'time_series';

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this about 4 times in the PR and it will probably used more, can we expose a isTSDBMode util from the streams package?

@flash1293 flash1293 Jan 28, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we should centralize. We should even go a bit further and centralize the whole base ESQL query generation (definition -> query) instead of having that logic in multiple places, there is no good reason for that.


const discoverUrl = useMemo(() => {
const indexPatterns = getIndexPatternsForStream(node.stream);

if (!discoverLocator || !indexPatterns) {
return undefined;
}

const sourceCommand = isTSDBMode ? 'TS' : 'FROM';
return discoverLocator.getRedirectUrl({
query: {
esql: `FROM ${indexPatterns.join(', ')}`,
esql: `${sourceCommand} ${indexPatterns.join(', ')}`,
},
});
}, [discoverLocator, node]);
}, [discoverLocator, node, isTSDBMode]);

return (
<EuiFlexGroup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export class StreamsApp {
).toContainText(expectedIlmPolicy);
}

async verifyDiscoverButtonLink(streamName: string) {
async verifyDiscoverButtonLink(streamName: string, sourceCommand: 'FROM' | 'TS' = 'FROM') {
const locator = this.page.locator(
`[data-test-subj="streamsDiscoverActionButton-${streamName}"]`
);
Expand All @@ -170,15 +170,38 @@ export class StreamsApp {
}

// Expect encoded ESQL snippet to appear (basic validation)
// 'FROM <streamName>' should appear URL-encoded
const expectedFragment = encodeURIComponent(`FROM ${streamName}`);
// '<sourceCommand> <streamName>' should appear URL-encoded
const expectedFragment = encodeURIComponent(`${sourceCommand} ${streamName}`);
if (!href.includes(expectedFragment)) {
throw new Error(
`Href for ${streamName} did not contain expected ESQL fragment. href=${href} expectedFragment=${expectedFragment}`
);
}
}

async getDiscoverButtonLinkSourceCommand(streamName: string): Promise<'FROM' | 'TS' | null> {
const locator = this.page.locator(
`[data-test-subj="streamsDiscoverActionButton-${streamName}"]`
);
await locator.waitFor();

const href = await locator.getAttribute('href');
if (!href) {
return null;
}

// Check which source command is used in the URL
const fromFragment = encodeURIComponent(`FROM ${streamName}`);
const tsFragment = encodeURIComponent(`TS ${streamName}`);

if (href.includes(tsFragment)) {
return 'TS';
} else if (href.includes(fromFragment)) {
return 'FROM';
}
return null;
}

async verifyStreamsAreInTable(streamNames: string[]) {
for (const name of streamNames) {
await expect(
Expand Down
Loading