diff --git a/api_docs/kbn_streams_schema.devdocs.json b/api_docs/kbn_streams_schema.devdocs.json index fd75b8c2c647a..19a33601902c3 100644 --- a/api_docs/kbn_streams_schema.devdocs.json +++ b/api_docs/kbn_streams_schema.devdocs.json @@ -1593,10 +1593,10 @@ }, { "parentPluginId": "@kbn/streams-schema", - "id": "def-common.isUnWiredStreamGetResponse", + "id": "def-common.isUnwiredStreamGetResponse", "type": "Function", "tags": [], - "label": "isUnWiredStreamGetResponse", + "label": "isUnwiredStreamGetResponse", "description": [], "signature": [ " = z.union([ + ingestStreamGetResponseSchema, + groupStreamGetResponseSchema, +]); + +export const streamUpsertRequestSchema: z.Schema = z.union([ + ingestStreamUpsertRequestSchema, + groupStreamUpsertRequestSchema, +]); + +export const isWiredStreamGetResponse = createIsNarrowSchema( + streamGetResponseSchema, + wiredStreamGetResponseSchema +); + +export const isUnwiredStreamGetResponse = createIsNarrowSchema( + streamGetResponseSchema, + unwiredStreamGetResponseSchema +); + +export const asWiredStreamGetResponse = createAsSchemaOrThrow( + streamGetResponseSchema, + wiredStreamGetResponseSchema +); + +export const asUnwiredStreamGetResponse = createAsSchemaOrThrow( + streamGetResponseSchema, + unwiredStreamGetResponseSchema +); -export const streamUpsertRequestSchema: z.Schema = - ingestStreamUpsertRequestSchema; +export const asIngestStreamGetResponse = createAsSchemaOrThrow( + streamGetResponseSchema, + ingestStreamGetResponseSchema +); -export type StreamGetResponse = IngestStreamGetResponse; -export type StreamUpsertRequest = IngestStreamUpsertRequest; +export type StreamGetResponse = IngestStreamGetResponse | GroupStreamGetResponse; +export type StreamUpsertRequest = IngestStreamUpsertRequest | GroupStreamUpsertRequest; diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/core.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/core.ts index 477eb7711218b..b77ed58d4c4d1 100644 --- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/core.ts +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/core.ts @@ -8,9 +8,13 @@ import { z } from '@kbn/zod'; import { createIsNarrowSchema } from '../helpers'; import { IngestStreamDefinition, ingestStreamDefinitionSchema } from './ingest'; +import { GroupStreamDefinition, groupStreamDefinitionSchema } from './group'; -export type StreamDefinition = IngestStreamDefinition; +export type StreamDefinition = IngestStreamDefinition | GroupStreamDefinition; -export const streamDefinitionSchema: z.Schema = ingestStreamDefinitionSchema; +export const streamDefinitionSchema: z.Schema = z.union([ + ingestStreamDefinitionSchema, + groupStreamDefinitionSchema, +]); export const isStreamDefinition = createIsNarrowSchema(z.unknown(), streamDefinitionSchema); diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/api.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/api.ts new file mode 100644 index 0000000000000..0d129c0e1e995 --- /dev/null +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/api.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { z } from '@kbn/zod'; +import { + StreamGetResponseBase, + streamGetResponseSchemaBase, + StreamUpsertRequestBase, + streamUpsertRequestSchemaBase, +} from '../base/api'; +import { GroupStreamDefinitionBase, groupStreamDefinitionBaseSchema } from './base'; + +/** + * Group get response + */ +interface GroupStreamGetResponse extends StreamGetResponseBase { + stream: GroupStreamDefinitionBase; +} + +const groupStreamGetResponseSchema: z.Schema = z.intersection( + streamGetResponseSchemaBase, + z.object({ + stream: groupStreamDefinitionBaseSchema, + }) +); + +/** + * Group upsert request + */ +interface GroupStreamUpsertRequest extends StreamUpsertRequestBase { + stream: GroupStreamDefinitionBase; +} + +const groupStreamUpsertRequestSchema: z.Schema = z.intersection( + streamUpsertRequestSchemaBase, + z.object({ + stream: groupStreamDefinitionBaseSchema, + }) +); + +export { + type GroupStreamGetResponse, + type GroupStreamUpsertRequest, + groupStreamGetResponseSchema, + groupStreamUpsertRequestSchema, +}; diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/base.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/base.ts new file mode 100644 index 0000000000000..ad9ee05b58d19 --- /dev/null +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/base.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { NonEmptyString } from '@kbn/zod-helpers'; +import { StreamDefinitionBase } from '../base'; + +interface GroupBase { + description?: string; + members: string[]; +} + +const groupBaseSchema: z.Schema = z.object({ + description: z.optional(z.string()), + members: z.array(NonEmptyString), +}); + +interface GroupStreamDefinitionBase { + group: GroupBase; +} + +const groupStreamDefinitionBaseSchema: z.Schema = z.object({ + group: groupBaseSchema, +}); + +type GroupStreamDefinition = StreamDefinitionBase & GroupStreamDefinitionBase; + +const groupStreamDefinitionSchema: z.Schema = z.intersection( + z.object({ name: NonEmptyString }), + groupStreamDefinitionBaseSchema +); + +export { + type GroupBase, + type GroupStreamDefinitionBase, + type GroupStreamDefinition, + groupBaseSchema, + groupStreamDefinitionBaseSchema, + groupStreamDefinitionSchema, +}; diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/index.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/index.ts new file mode 100644 index 0000000000000..145a9ec410405 --- /dev/null +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/group/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './base'; +export * from './api'; diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/helpers.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/helpers.ts index 6330cab2a134d..a9a5e7ef24ffc 100644 --- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/helpers.ts +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/helpers.ts @@ -4,9 +4,10 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ - -import { createIsNarrowSchema } from '../helpers'; +import { z } from '@kbn/zod'; +import { createAsSchemaOrThrow, createIsNarrowSchema } from '../helpers'; import { streamDefinitionSchema } from './core'; +import { groupStreamDefinitionBaseSchema, groupStreamDefinitionSchema } from './group'; import { ingestStreamDefinitionSchema, unwiredStreamDefinitionSchema, @@ -23,11 +24,31 @@ export const isWiredStreamDefinition = createIsNarrowSchema( wiredStreamDefinitionSchema ); +export const asIngestStreamDefinition = createAsSchemaOrThrow( + streamDefinitionSchema, + ingestStreamDefinitionSchema +); + +export const asWiredStreamDefinition = createAsSchemaOrThrow( + streamDefinitionSchema, + wiredStreamDefinitionSchema +); + export const isUnwiredStreamDefinition = createIsNarrowSchema( streamDefinitionSchema, unwiredStreamDefinitionSchema ); +export const isGroupStreamDefinition = createIsNarrowSchema( + streamDefinitionSchema, + groupStreamDefinitionSchema +); + +export const isGroupStreamDefinitionBase = createIsNarrowSchema( + z.unknown(), + groupStreamDefinitionBaseSchema +); + export const isRootStreamDefinition = createIsNarrowSchema( streamDefinitionSchema, wiredStreamDefinitionSchema.refine((stream) => { diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/index.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/index.ts index d4d6276fee51b..41df083340be8 100644 --- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/index.ts +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/index.ts @@ -11,3 +11,4 @@ export * from './legacy'; export * from './api'; export * from './core'; export * from './helpers'; +export * from './group'; diff --git a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/ingest/api.ts b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/ingest/api.ts index 0803c848ad663..0a4231c64cfe2 100644 --- a/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/ingest/api.ts +++ b/x-pack/solutions/observability/packages/kbn-streams-schema/src/models/ingest/api.ts @@ -26,7 +26,6 @@ import { wiredStreamDefinitionSchemaBase, } from './base'; import { ElasticsearchAsset, elasticsearchAssetSchema } from './common'; -import { createIsNarrowSchema, createAsSchemaOrThrow } from '../../helpers'; import { UnwiredIngestStreamEffectiveLifecycle, WiredIngestStreamEffectiveLifecycle, @@ -146,33 +145,12 @@ const ingestStreamGetResponseSchema: z.Schema = z.union unwiredStreamGetResponseSchema, ]); -const isWiredStreamGetResponse = createIsNarrowSchema( - ingestStreamGetResponseSchema, - wiredStreamGetResponseSchema -); - -const isUnWiredStreamGetResponse = createIsNarrowSchema( - ingestStreamGetResponseSchema, - unwiredStreamGetResponseSchema -); - -const asWiredStreamGetResponse = createAsSchemaOrThrow( - ingestStreamGetResponseSchema, - wiredStreamGetResponseSchema -); - -const asUnwiredStreamGetResponse = createAsSchemaOrThrow( - ingestStreamGetResponseSchema, - unwiredStreamGetResponseSchema -); - export { ingestStreamUpsertRequestSchema, ingestUpsertRequestSchema, - isWiredStreamGetResponse, - isUnWiredStreamGetResponse, - asWiredStreamGetResponse, - asUnwiredStreamGetResponse, + ingestStreamGetResponseSchema, + wiredStreamGetResponseSchema, + unwiredStreamGetResponseSchema, type IngestGetResponse, type IngestStreamGetResponse, type IngestStreamUpsertRequest, diff --git a/x-pack/solutions/observability/packages/utils_server/es/storage/index.ts b/x-pack/solutions/observability/packages/utils_server/es/storage/index.ts index fde2e0fa966b1..d2051b720dfe2 100644 --- a/x-pack/solutions/observability/packages/utils_server/es/storage/index.ts +++ b/x-pack/solutions/observability/packages/utils_server/es/storage/index.ts @@ -52,7 +52,7 @@ export type StorageClientBulkOperation = } | { delete: { _id: string } }; -export type StorageClientBulkRequest> = Omit< +export type StorageClientBulkRequest = Omit< BulkRequest, 'operations' | 'index' > & { @@ -80,21 +80,20 @@ export type StorageClientIndexRequest = Omit< export type StorageClientIndexResponse = IndexResponse; export type StorageClientGetRequest = Omit; -export type StorageClientGetResponse> = - GetResponse; +export type StorageClientGetResponse = GetResponse; -export type StorageClientSearch = < +export type StorageClientSearch = < TSearchRequest extends StorageClientSearchRequest >( request: TSearchRequest -) => Promise, TSearchRequest>>; +) => Promise>; -export type StorageClientBulk = ( - request: StorageClientBulkRequest> +export type StorageClientBulk = ( + request: StorageClientBulkRequest ) => Promise; -export type StorageClientIndex = ( - request: StorageClientIndexRequest> +export type StorageClientIndex = ( + request: StorageClientIndexRequest ) => Promise; export type StorageClientDelete = ( @@ -103,22 +102,50 @@ export type StorageClientDelete = ( export type StorageClientClean = () => Promise; -export type StorageClientGet = ( +export type StorageClientGet = ( request: StorageClientGetRequest -) => Promise>>; +) => Promise>; export type StorageClientExistsIndex = () => Promise; -export interface IStorageClient { - search: StorageClientSearch; - bulk: StorageClientBulk; - index: StorageClientIndex; +export interface InternalIStorageClient { + search: StorageClientSearch; + bulk: StorageClientBulk; + index: StorageClientIndex; delete: StorageClientDelete; clean: StorageClientClean; - get: StorageClientGet; + get: StorageClientGet; existsIndex: StorageClientExistsIndex; } +type UnionKeys = T extends T ? keyof T : never; +type Exact = T extends U + ? Exclude, UnionKeys> extends never + ? true + : false + : false; + +// The storage settings need to support the application payload type, but it's OK if the +// storage document can hold more fields than the application document. +// To keep the type safety of the application type in the consuming code, both the storage +// settings and the application type are passed to the IStorageClient type. +// The IStorageClient type then checks if the application type is a subset of the storage +// document type. If this is not the case, the IStorageClient type is set to never, which +// will cause a type error in the consuming code. +export type IStorageClient = Exact< + ApplicationDocument, + Partial> +> extends true + ? InternalIStorageClient> + : never; + +export type SimpleIStorageClient = IStorageClient< + TStorageSettings, + Omit, '_id'> +>; + +export type ApplicationDocument = TApplicationType & { _id: string }; + export type StorageDocumentOf = StorageFieldTypeOf<{ type: 'object'; properties: TStorageSettings['schema']['properties']; diff --git a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts index ac35a0aaf3dad..233f6797c9092 100644 --- a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts +++ b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts @@ -27,13 +27,14 @@ import { StorageClientIndex, StorageClientIndexResponse, StorageClientSearch, - IStorageClient, StorageClientGet, StorageClientExistsIndex, StorageDocumentOf, StorageClientSearchResponse, StorageClientClean, StorageClientCleanResponse, + ApplicationDocument, + InternalIStorageClient, } from '..'; import { getSchemaVersion } from '../get_schema_version'; import { StorageMappingProperty } from '../types'; @@ -94,7 +95,7 @@ function wrapEsCall(p: Promise): Promise { * - Index Lifecycle Management * - Schema upgrades w/ fallbacks */ -export class StorageIndexAdapter { +export class StorageIndexAdapter { private readonly logger: Logger; constructor( private readonly esClient: ElasticsearchClient, @@ -316,7 +317,7 @@ export class StorageIndexAdapter return []; } - private search: StorageClientSearch = async (request) => { + private search: StorageClientSearch> = async (request) => { return (await wrapEsCall( this.esClient .search({ @@ -345,10 +346,10 @@ export class StorageIndexAdapter } throw error; }) - )) as unknown as ReturnType>; + )) as unknown as ReturnType>>; }; - private index: StorageClientIndex = async ({ + private index: StorageClientIndex> = async ({ id, refresh = 'wait_for', ...request @@ -387,7 +388,7 @@ export class StorageIndexAdapter }); }; - private bulk: StorageClientBulk = ({ + private bulk: StorageClientBulk> = ({ operations, refresh = 'wait_for', ...request @@ -402,7 +403,7 @@ export class StorageIndexAdapter _id: operation.index._id, }, }, - operation.index.document, + operation.index.document as {}, ]; } @@ -518,7 +519,10 @@ export class StorageIndexAdapter return { acknowledged: true, result: 'not_found' }; }; - private get: StorageClientGet = async ({ id, ...request }) => { + private get: StorageClientGet> = async ({ + id, + ...request + }) => { const response = await this.search({ track_total_hits: false, size: 1, @@ -558,7 +562,7 @@ export class StorageIndexAdapter _id: hit._id!, _index: hit._index, found: true, - _source: hit._source as StorageDocumentOf, + _source: hit._source as ApplicationDocument, _ignored: hit._ignored, _primary_term: hit._primary_term, _routing: hit._routing, @@ -574,7 +578,7 @@ export class StorageIndexAdapter }); }; - getClient(): IStorageClient { + getClient(): InternalIStorageClient> { return { bulk: this.bulk, delete: this.delete, @@ -586,3 +590,6 @@ export class StorageIndexAdapter }; } } + +export type SimpleStorageIndexAdapter = + StorageIndexAdapter, '_id'>>; diff --git a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/integration_tests/index.test.ts b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/integration_tests/index.test.ts index 4574a777a1b6a..4b9327d4cc5c8 100644 --- a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/integration_tests/index.test.ts +++ b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/integration_tests/index.test.ts @@ -11,7 +11,7 @@ import { type TestKibanaUtils, } from '@kbn/core-test-helpers-kbn-server'; import { - IStorageClient, + SimpleIStorageClient, StorageClientBulkResponse, StorageClientIndexResponse, StorageIndexAdapter, @@ -22,6 +22,7 @@ import { httpServerMock } from '@kbn/core/server/mocks'; import * as getSchemaVersionModule from '../../get_schema_version'; import { isResponseError } from '@kbn/es-errors'; import { IndicesGetResponse } from '@elastic/elasticsearch/lib/api/types'; +import { SimpleStorageIndexAdapter } from '..'; const TEST_INDEX_NAME = 'test_index'; @@ -56,8 +57,8 @@ describe('StorageIndexAdapter', () => { }, } satisfies StorageSettings; - let adapter: StorageIndexAdapter; - let client: IStorageClient; + let adapter: SimpleStorageIndexAdapter; + let client: SimpleIStorageClient; describe('with a clean Elasticsearch instance', () => { beforeAll(async () => { @@ -406,7 +407,7 @@ describe('StorageIndexAdapter', () => { function createStorageIndexAdapter( settings: TStorageSettings - ): StorageIndexAdapter { + ): SimpleStorageIndexAdapter { return new StorageIndexAdapter(esClient, loggerMock, settings); } diff --git a/x-pack/solutions/observability/packages/utils_server/es/storage/types.ts b/x-pack/solutions/observability/packages/utils_server/es/storage/types.ts index c13487a4d2abf..08d8fecff3cf0 100644 --- a/x-pack/solutions/observability/packages/utils_server/es/storage/types.ts +++ b/x-pack/solutions/observability/packages/utils_server/es/storage/types.ts @@ -91,7 +91,7 @@ type PrimitiveOf = { float: number; object: TProperty extends { properties: Record } ? { - [key in keyof TProperty['properties']]: StorageFieldTypeOf; + [key in keyof TProperty['properties']]?: StorageFieldTypeOf; } : object; }[TProperty['type']]; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_client.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_client.ts index 1bc1385a081b3..2406c56c809fe 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_client.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_client.ts @@ -8,7 +8,7 @@ import { SanitizedRule } from '@kbn/alerting-plugin/common'; import { RulesClient } from '@kbn/alerting-plugin/server'; import { SavedObject, SavedObjectsClientContract } from '@kbn/core/server'; import { termQuery } from '@kbn/observability-utils-server/es/queries/term_query'; -import { IStorageClient, StorageDocumentOf } from '@kbn/observability-utils-server/es/storage'; +import { IStorageClient } from '@kbn/observability-utils-server/es/storage'; import { keyBy } from 'lodash'; import objectHash from 'object-hash'; import pLimit from 'p-limit'; @@ -64,7 +64,7 @@ function getAssetDocument({ entityId, entityType, assetType, -}: AssetLink & { entityId: string; entityType: string }): StorageDocumentOf { +}: AssetLink & { entityId: string; entityType: string }) { const doc = { 'asset.id': assetId, 'asset.type': assetType, @@ -87,10 +87,17 @@ interface AssetBulkDeleteOperation { export type AssetBulkOperation = AssetBulkIndexOperation | AssetBulkDeleteOperation; +export interface StoredAssetLink { + 'asset.id': string; + 'asset.type': AssetType; + 'entity.id': string; + 'entity.type': string; +} + export class AssetClient { constructor( private readonly clients: { - storageClient: IStorageClient; + storageClient: IStorageClient; soClient: SavedObjectsClientContract; rulesClient: RulesClient; } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_service.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_service.ts index 0a54e0abbd07a..47bd57433b9d8 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_service.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/assets/asset_service.ts @@ -8,8 +8,8 @@ import { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server'; import { StorageIndexAdapter } from '@kbn/observability-utils-server/es/storage'; import { StreamsPluginStartDependencies } from '../../../types'; -import { AssetClient } from './asset_client'; -import { assetStorageSettings } from './storage_settings'; +import { AssetClient, StoredAssetLink } from './asset_client'; +import { AssetStorageSettings, assetStorageSettings } from './storage_settings'; export class AssetService { constructor( @@ -20,7 +20,7 @@ export class AssetService { async getClientWithRequest({ request }: { request: KibanaRequest }): Promise { const [coreStart, pluginsStart] = await this.coreSetup.getStartServices(); - const adapter = new StorageIndexAdapter( + const adapter = new StorageIndexAdapter( coreStart.elasticsearch.client.asInternalUser, this.logger.get('assets'), assetStorageSettings diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts index 0bfb31818632a..a0714e297679c 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/client.ts @@ -15,15 +15,19 @@ import type { IScopedClusterClient, Logger } from '@kbn/core/server'; import { isResponseError } from '@kbn/es-errors'; import { Condition, + GroupStreamDefinition, IngestStreamLifecycle, StreamDefinition, StreamUpsertRequest, UnwiredStreamDefinition, WiredStreamDefinition, + asIngestStreamDefinition, assertsSchema, getAncestors, getParentId, isChildOf, + isGroupStreamDefinition, + isIngestStreamDefinition, isDslLifecycle, isIlmLifecycle, isInheritLifecycle, @@ -34,6 +38,7 @@ import { } from '@kbn/streams-schema'; import { cloneDeep, keyBy, omit, orderBy } from 'lodash'; import { AssetClient } from './assets/asset_client'; +import { ForbiddenMemberTypeError } from './errors/forbidden_member_type_error'; import { syncUnwiredStreamDefinitionObjects, syncWiredStreamDefinitionObjects, @@ -321,6 +326,10 @@ export class StreamsClient { validateStreamTypeChanges(existingDefinition, definition); } + if (isGroupStreamDefinition(definition)) { + await this.assertValidGroupMembers({ definition }); + } + if (isRootStreamDefinition(definition)) { // only allow selective updates to a root stream validateRootStreamChanges( @@ -444,6 +453,29 @@ export class StreamsClient { return { parentDefinition }; } + /** + * Validates the members of the group streams to ensure they are NOT + * GroupStreamDefinitions + */ + async assertValidGroupMembers({ definition }: { definition: GroupStreamDefinition }) { + const { members } = definition.group; + + if (members.includes(definition.name)) { + throw new ForbiddenMemberTypeError('Group streams can not include themselves as a member'); + } + + await Promise.all( + members.map(async (name) => { + const memberStream = await this.getStream(name); + if (isGroupStreamDefinition(memberStream)) { + throw new ForbiddenMemberTypeError( + `Group streams can not be a member of a group, please remove [${name}]` + ); + } + }) + ); + } + /** * Forks a stream into a child with a specific condition. * It mostly defers to `upsertStream` for its validations, @@ -465,7 +497,7 @@ export class StreamsClient { name: string; if: Condition; }): Promise { - const parentDefinition = await this.getStream(parent); + const parentDefinition = asIngestStreamDefinition(await this.getStream(parent)); const childDefinition: WiredStreamDefinition = { name, @@ -526,30 +558,44 @@ export class StreamsClient { * Returns a stream definition for the given name: * - if a wired stream definition exists * - if an ingest stream definition exists - * - if a data stream exists (creates an ingest - * definition on the fly) + * - if a data stream exists (creates an ingest definition on the fly) + * - if a group stream definition exists * * Throws when: * - no definition is found * - the user does not have access to the stream */ async getStream(name: string): Promise { - const definition = await this.getStoredStreamDefinition(name) - .catch(async (error) => { + try { + const response = await this.dependencies.storageClient.get({ id: name }); + + const streamDefinition = response._source; + assertsSchema(streamDefinitionSchema, streamDefinition); + + if (isIngestStreamDefinition(streamDefinition)) { + const privileges = await checkAccess({ + id: name, + scopedClusterClient: this.dependencies.scopedClusterClient, + }); + if (!privileges.read) { + throw new DefinitionNotFoundError(`Stream definition for ${name} not found`); + } + } + return streamDefinition; + } catch (error) { + try { if (isElasticsearch404(error)) { const dataStream = await this.getDataStream(name); return this.getDataStreamAsIngestStream(dataStream); } throw error; - }) - .catch(async (error) => { - if (isElasticsearch404(error)) { + } catch (e) { + if (isElasticsearch404(e)) { throw new DefinitionNotFoundError(`Cannot find stream ${name}`); } - throw error; - }); - - return definition; + throw e; + } + } } private async getStoredStreamDefinition(name: string): Promise { @@ -678,11 +724,14 @@ export class StreamsClient { }); const privileges = await checkAccessBulk({ - ids: streams.map((stream) => stream.name), + ids: streams + .filter((stream) => !isGroupStreamDefinition(stream)) + .map((stream) => stream.name), scopedClusterClient, }); return streams.filter((stream) => { + if (isGroupStreamDefinition(stream)) return true; return privileges[stream.name]?.read === true; }); } @@ -695,13 +744,13 @@ export class StreamsClient { private async deleteStreamFromDefinition(definition: StreamDefinition): Promise { const { assetClient, logger, scopedClusterClient } = this.dependencies; - if (!isWiredStreamDefinition(definition)) { + if (isUnwiredStreamDefinition(definition)) { await deleteUnmanagedStreamObjects({ scopedClusterClient, id: definition.name, logger, }); - } else { + } else if (isWiredStreamDefinition(definition)) { const parentId = getParentId(definition.name); // need to update parent first to cut off documents streaming down @@ -763,15 +812,20 @@ export class StreamsClient { * Also verifies whether the user has access to the stream. */ async deleteStream(name: string): Promise { - const [definition, access] = await Promise.all([ - this.getStream(name).catch((error) => { - if (isDefinitionNotFoundError(error)) { - return undefined; - } - throw error; - }), - checkAccess({ id: name, scopedClusterClient: this.dependencies.scopedClusterClient }), - ]); + const definition = await this.getStream(name).catch((error) => { + if (isDefinitionNotFoundError(error)) { + return undefined; + } + throw error; + }); + + const access = + definition && isGroupStreamDefinition(definition) + ? { write: true, read: true } + : await checkAccess({ + id: name, + scopedClusterClient: this.dependencies.scopedClusterClient, + }); if (!access.write) { throw new SecurityError(`Cannot delete stream, insufficient privileges`); @@ -781,9 +835,11 @@ export class StreamsClient { return { acknowledged: true, result: 'noop' }; } - const parentId = getParentId(name); - if (isWiredStreamDefinition(definition) && !parentId) { - throw new MalformedStreamIdError('Cannot delete root stream'); + if (isWiredStreamDefinition(definition)) { + const parentId = getParentId(name); + if (!parentId) { + throw new MalformedStreamIdError('Cannot delete root stream'); + } } await this.deleteStreamFromDefinition(definition); @@ -794,13 +850,7 @@ export class StreamsClient { private async updateStoredStream(definition: StreamDefinition) { return this.dependencies.storageClient.index({ id: definition.name, - document: omit( - definition, - 'elasticsearch_assets', - 'dashboards', - 'inherited_fields', - 'lifecycle' - ), + document: definition, }); } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/forbidden_member_type_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/forbidden_member_type_error.ts new file mode 100644 index 0000000000000..e210de55f6c9b --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/forbidden_member_type_error.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { StatusError } from './status_error'; + +export class ForbiddenMemberTypeError extends StatusError { + constructor(message: string) { + super(message, 400); + this.name = 'ForbiddenMemberType'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts index d9d8f11730126..0f142e4314b9c 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/validate_stream.ts @@ -9,6 +9,7 @@ import { StreamDefinition, WiredStreamDefinition, isIlmLifecycle, + isIngestStreamDefinition, isInheritLifecycle, isUnwiredStreamDefinition, isWiredStreamDefinition, @@ -97,6 +98,9 @@ export function validateStreamChildrenChanges( } export function validateStreamLifecycle(definition: StreamDefinition, isServerless: boolean) { + if (!isIngestStreamDefinition(definition)) { + return; + } const lifecycle = definition.ingest.lifecycle; if (isServerless && isIlmLifecycle(lifecycle)) { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts index 18d35bc90e76b..da7a30aedd3e7 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -6,6 +6,7 @@ */ import { + IngestStreamDefinition, StreamDefinition, getParentId, isRoot, @@ -49,7 +50,7 @@ export function generateIngestPipeline( value: definition.name, }, }, - ...formatToIngestProcessors(definition.ingest.processing), + ...((isWiredStream && formatToIngestProcessors(definition.ingest.processing)) || []), { pipeline: { name: `${id}@stream.reroutes`, @@ -65,7 +66,7 @@ export function generateIngestPipeline( }; } -export function generateClassicIngestPipelineBody(definition: StreamDefinition) { +export function generateClassicIngestPipelineBody(definition: IngestStreamDefinition) { return { processors: formatToIngestProcessors(definition.ingest.processing), _meta: { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts index 63b9ac21cada6..bd349272c6379 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts @@ -5,13 +5,13 @@ * 2.0. */ -import { StreamDefinition } from '@kbn/streams-schema'; +import { IngestStreamDefinition } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { conditionToPainless } from '../helpers/condition_to_painless'; import { getReroutePipelineName } from './name'; interface GenerateReroutePipelineParams { - definition: StreamDefinition; + definition: IngestStreamDefinition; } export function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/service.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/service.ts index 9208853e2c584..b79159d0cfaa3 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/service.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/service.ts @@ -12,6 +12,7 @@ import { StorageSettings, types, } from '@kbn/observability-utils-server/es/storage'; +import { StreamDefinition } from '@kbn/streams-schema'; import type { StreamsPluginStartDependencies } from '../../types'; import { StreamsClient } from './client'; import { AssetClient } from './assets/asset_client'; @@ -22,12 +23,13 @@ export const streamsStorageSettings = { properties: { name: types.keyword(), ingest: types.object({ enabled: false }), + group: types.object({ enabled: false }), }, }, } satisfies StorageSettings; export type StreamsStorageSettings = typeof streamsStorageSettings; -export type StreamsStorageClient = IStorageClient; +export type StreamsStorageClient = IStorageClient; export class StreamsService { constructor( @@ -50,7 +52,7 @@ export class StreamsService { const isServerless = coreStart.elasticsearch.getCapabilities().serverless; - const storageAdapter = new StorageIndexAdapter( + const storageAdapter = new StorageIndexAdapter( scopedClusterClient.asInternalUser, logger, streamsStorageSettings diff --git a/x-pack/solutions/observability/plugins/streams/server/plugin.ts b/x-pack/solutions/observability/plugins/streams/server/plugin.ts index ce4db83e6ea01..85fe61c86c93b 100644 --- a/x-pack/solutions/observability/plugins/streams/server/plugin.ts +++ b/x-pack/solutions/observability/plugins/streams/server/plugin.ts @@ -86,7 +86,12 @@ export class StreamsPlugin const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request); const soClient = coreStart.savedObjects.getScopedClient(request); - return { scopedClusterClient, soClient, assetClient, streamsClient }; + return { + scopedClusterClient, + soClient, + assetClient, + streamsClient, + }; }, }, core, diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts index 1189e04924c2e..b750252b49a44 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/read_stream.ts @@ -6,9 +6,10 @@ */ import { - IngestStreamGetResponse, InheritedFieldDefinition, + StreamGetResponse, WiredStreamGetResponse, + isGroupStreamDefinition, isUnwiredStreamDefinition, } from '@kbn/streams-schema'; import { IScopedClusterClient } from '@kbn/core/server'; @@ -30,14 +31,25 @@ export async function readStream({ assetClient: AssetClient; streamsClient: StreamsClient; scopedClusterClient: IScopedClusterClient; -}): Promise { - const [streamDefinition, dashboards, ancestors, dataStream] = await Promise.all([ +}): Promise { + const [streamDefinition, dashboards] = await Promise.all([ streamsClient.getStream(name), assetClient.getAssetIds({ entityId: name, entityType: 'stream', assetType: 'dashboard', }), + ]); + + if (isGroupStreamDefinition(streamDefinition)) { + return { + stream: streamDefinition, + dashboards, + }; + } + + // These queries are only relavant for IngestStreams + const [ancestors, dataStream] = await Promise.all([ streamsClient.getAncestors(name), streamsClient.getDataStream(name).catch((e) => { if (e.statusCode === 404) { diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts index ee2deb2c63115..c8a95e3f23206 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts @@ -7,6 +7,7 @@ import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; import { + isGroupStreamDefinition, StreamDefinition, StreamGetResponse, streamUpsertRequestSchema, @@ -76,9 +77,12 @@ export const streamDetailRoute = createServerRoute({ const { scopedClusterClient, streamsClient } = await getScopedClients({ request }); const streamEntity = await streamsClient.getStream(params.path.id); + const indexPattern = isGroupStreamDefinition(streamEntity) + ? streamEntity.group.members.join(',') + : streamEntity.name; // check doc count const docCountResponse = await scopedClusterClient.asCurrentUser.search({ - index: streamEntity.name, + index: indexPattern, body: { track_total_hits: true, query: { @@ -144,7 +148,6 @@ export const editStreamRoute = createServerRoute({ }), handler: async ({ params, request, getScopedClients }): Promise => { const { streamsClient } = await getScopedClients({ request }); - return await streamsClient.upsertStream({ request: params.body, name: params.path.id, diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx index 9050d234ceffc..3e0f5393fb118 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx @@ -21,7 +21,7 @@ import React, { useMemo } from 'react'; import { css } from '@emotion/css'; import { IngestStreamGetResponse, - isUnWiredStreamGetResponse, + isUnwiredStreamGetResponse, isWiredStreamDefinition, } from '@kbn/streams-schema'; import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range'; @@ -132,7 +132,7 @@ export function StreamDetailOverview({ definition }: { definition?: IngestStream async ({ signal }) => { if ( !definition || - (isUnWiredStreamGetResponse(definition) && !definition.data_stream_exists) + (isUnwiredStreamGetResponse(definition) && !definition.data_stream_exists) ) { return undefined; } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_view/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_view/index.tsx index 59181dbd080c8..154bf202a4e75 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_view/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_view/index.tsx @@ -5,6 +5,7 @@ * 2.0. */ import { i18n } from '@kbn/i18n'; +import { isUnwiredStreamGetResponse, isWiredStreamGetResponse } from '@kbn/streams-schema'; import React from 'react'; import { useKibana } from '../../hooks/use_kibana'; import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch'; @@ -35,15 +36,45 @@ export function StreamDetailView() { refresh, loading, } = useStreamsAppFetch( - ({ signal }) => { - return streamsRepositoryClient.fetch('GET /api/streams/{id}', { - signal, - params: { - path: { - id: key, + async ({ signal }) => { + return streamsRepositoryClient + .fetch('GET /api/streams/{id}', { + signal, + params: { + path: { + id: key, + }, }, - }, - }); + }) + .then((response) => { + if (isWiredStreamGetResponse(response)) { + return { + dashboards: response.dashboards, + inherited_fields: response.inherited_fields, + elasticsearch_assets: [], + effective_lifecycle: response.effective_lifecycle, + name: key, + stream: { + ...response.stream, + }, + }; + } + + if (isUnwiredStreamGetResponse(response)) { + return { + dashboards: response.dashboards, + elasticsearch_assets: response.elasticsearch_assets, + inherited_fields: {}, + effective_lifecycle: response.effective_lifecycle, + name: key, + data_stream_exists: response.data_stream_exists, + stream: { + ...response.stream, + }, + }; + } + throw new Error('Stream detail only supports IngestStreams.'); + }); }, [streamsRepositoryClient, key] ); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts index 560e091c067ab..8f44fc64a71f8 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts @@ -7,7 +7,7 @@ import expect from '@kbn/expect'; import { - StreamUpsertRequest, + isGroupStreamDefinitionBase, StreamGetResponse, WiredStreamGetResponse, } from '@kbn/streams-schema'; @@ -17,124 +17,7 @@ import { createStreamsRepositoryAdminClient, } from './helpers/repository_client'; import { disableStreams, enableStreams, indexDocument } from './helpers/requests'; - -type StreamPutItem = Omit & { name: string }; - -const streams: StreamPutItem[] = [ - { - name: 'logs', - stream: { - ingest: { - lifecycle: { dsl: {} }, - processing: [], - wired: { - fields: { - '@timestamp': { - type: 'date', - }, - message: { - type: 'match_only_text', - }, - 'host.name': { - type: 'keyword', - }, - 'log.level': { - type: 'keyword', - }, - 'stream.name': { - type: 'keyword', - }, - }, - }, - routing: [ - { - destination: 'logs.test', - if: { - and: [ - { - field: 'numberfield', - operator: 'gt', - value: 15, - }, - ], - }, - }, - { - destination: 'logs.test2', - if: { - and: [ - { - field: 'field2', - operator: 'eq', - value: 'abc', - }, - ], - }, - }, - ], - }, - }, - }, - { - name: 'logs.test', - stream: { - ingest: { - lifecycle: { inherit: {} }, - routing: [], - processing: [], - wired: { - fields: { - numberfield: { - type: 'long', - }, - }, - }, - }, - }, - }, - { - name: 'logs.test2', - stream: { - ingest: { - lifecycle: { inherit: {} }, - processing: [ - { - grok: { - field: 'message', - patterns: ['%{NUMBER:numberfield}'], - if: { always: {} }, - }, - }, - ], - wired: { - fields: { - field2: { - type: 'keyword', - }, - }, - }, - routing: [], - }, - }, - }, - { - name: 'logs.deeply.nested.streamname', - stream: { - ingest: { - lifecycle: { inherit: {} }, - processing: [], - wired: { - fields: { - field2: { - type: 'keyword', - }, - }, - }, - routing: [], - }, - }, - }, -]; +import { createStreams } from './helpers/create_streams'; export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const roleScopedSupertest = getService('roleScopedSupertest'); @@ -147,7 +30,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { before(async () => { apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); await enableStreams(apiClient); - await createStreams(); + await createStreams(apiClient); await indexDocuments(); }); @@ -156,7 +39,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); it('checks whether deeply nested stream is created correctly', async () => { - function getChildNames(stream: StreamGetResponse['stream']) { + function getChildNames(stream: StreamGetResponse['stream']): string[] { + if (isGroupStreamDefinitionBase(stream)) return []; return stream.ingest.routing.map((r) => r.destination); } const logs = await apiClient.fetch('GET /api/streams/{id}', { @@ -224,23 +108,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(logsTest2Response.hits.total).to.eql({ value: 1, relation: 'eq' }); }); - async function createStreams() { - for (const { name: streamId, ...stream } of streams) { - await apiClient - .fetch('PUT /api/streams/{id}', { - params: { - body: { - ...stream, - dashboards: [], - } as StreamUpsertRequest, - path: { id: streamId }, - }, - }) - .expect(200) - .then((response) => expect(response.body.acknowledged).to.eql(true)); - } - } - async function indexDocuments() { // send data that stays in logs const doc = { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts new file mode 100644 index 0000000000000..fc9b74b314bab --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; +import { disableStreams, enableStreams } from './helpers/requests'; +import { createStreams } from './helpers/create_streams'; + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + + let apiClient: StreamsSupertestRepositoryClient; + + // An anticipated use case is that a user will want to flush a tree of streams from a config file + describe('GroupStreamDefinition', () => { + describe('CRUD API Operations', () => { + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + await enableStreams(apiClient); + await createStreams(apiClient); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + it('successfully creates a GroupStream', async () => { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + path: { id: 'test-group' }, + body: { + stream: { + group: { + members: ['logs', 'logs.test2'], + }, + }, + dashboards: [], + }, + }, + }) + .expect(200) + .then((response) => expect(response.body.acknowledged).to.eql(true)); + }); + + it('successfully creates a second GroupStream', async () => { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + path: { id: 'test-group-too' }, + body: { + stream: { + group: { + members: ['logs.test2'], + }, + }, + dashboards: [], + }, + }, + }) + .expect(200) + .then((response) => expect(response.body.acknowledged).to.eql(true)); + }); + + it('unsuccessfully updates a GroupStream with an uknown stream', async () => { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + path: { id: 'test-group' }, + body: { + stream: { + group: { + members: ['logs', 'non-existent-stream'], + }, + }, + dashboards: [], + }, + }, + }) + .expect(404); + }); + + it('unsuccessfully updates a GroupStream with an itself as a member', async () => { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + path: { id: 'test-group' }, + body: { + stream: { + group: { + members: ['logs', 'test-group'], + }, + }, + dashboards: [], + }, + }, + }) + .expect(400); + }); + + it('unsuccessfully updates a GroupStream with a forbidden member', async () => { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + path: { id: 'test-group' }, + body: { + stream: { + group: { + members: ['logs', 'test-group-too'], + }, + }, + dashboards: [], + }, + }, + }) + .expect(400); + }); + + it('successfully deletes a GroupStream', async () => { + await apiClient + .fetch('DELETE /api/streams/{id}', { + params: { + path: { id: 'test-group-too' }, + }, + }) + .expect(200); + }); + + it('successfully reads a GroupStream', async () => { + const response = await apiClient + .fetch('GET /api/streams/{id}', { + params: { + path: { id: 'test-group' }, + }, + }) + .expect(200); + expect(response.body).to.eql({ + stream: { + name: 'test-group', + group: { + members: ['logs', 'logs.test2'], + }, + }, + dashboards: [], + }); + }); + + it('successfully lists a GroupStream', async () => { + const response = await apiClient.fetch('GET /api/streams').expect(200); + expect(response.body.streams.some((stream) => stream.name === 'test-group')).to.eql(true); + }); + }); + }); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts new file mode 100644 index 0000000000000..bba31350b4145 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts @@ -0,0 +1,145 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamUpsertRequest } from '@kbn/streams-schema'; +import expect from '@kbn/expect'; +import { StreamsSupertestRepositoryClient } from './repository_client'; + +type StreamPutItem = Omit & { name: string }; + +const streams: StreamPutItem[] = [ + { + name: 'logs', + stream: { + ingest: { + lifecycle: { dsl: {} }, + processing: [], + wired: { + fields: { + '@timestamp': { + type: 'date', + }, + message: { + type: 'match_only_text', + }, + 'host.name': { + type: 'keyword', + }, + 'log.level': { + type: 'keyword', + }, + 'stream.name': { + type: 'keyword', + }, + }, + }, + routing: [ + { + destination: 'logs.test', + if: { + and: [ + { + field: 'numberfield', + operator: 'gt', + value: 15, + }, + ], + }, + }, + { + destination: 'logs.test2', + if: { + and: [ + { + field: 'field2', + operator: 'eq', + value: 'abc', + }, + ], + }, + }, + ], + }, + }, + }, + { + name: 'logs.test', + stream: { + ingest: { + lifecycle: { inherit: {} }, + routing: [], + processing: [], + wired: { + fields: { + numberfield: { + type: 'long', + }, + }, + }, + }, + }, + }, + { + name: 'logs.test2', + stream: { + ingest: { + lifecycle: { inherit: {} }, + processing: [ + { + grok: { + field: 'message', + patterns: ['%{NUMBER:numberfield}'], + if: { always: {} }, + }, + }, + ], + wired: { + fields: { + field2: { + type: 'keyword', + }, + }, + }, + routing: [], + }, + }, + }, + { + name: 'logs.deeply.nested.streamname', + stream: { + ingest: { + lifecycle: { inherit: {} }, + processing: [], + wired: { + fields: { + field2: { + type: 'keyword', + }, + }, + }, + routing: [], + }, + }, + }, +]; + +export async function createStreams(apiClient: StreamsSupertestRepositoryClient) { + for (const { name: streamId, ...stream } of streams) { + await apiClient + .fetch('PUT /api/streams/{id}', { + params: { + body: { + ...stream, + dashboards: [], + } as StreamUpsertRequest, + path: { id: streamId }, + }, + }) + .expect(200) + .then((response) => expect(response.body.acknowledged).to.eql(true)); + } +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts index aa3861501bc21..00cb65111cb7c 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts @@ -17,6 +17,7 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) loadTestFile(require.resolve('./schema')); loadTestFile(require.resolve('./processing_simulate')); loadTestFile(require.resolve('./root_stream')); + loadTestFile(require.resolve('./group_streams')); loadTestFile(require.resolve('./lifecycle')); }); } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts index 64d7b2e0949c2..73e00c456a0a4 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts @@ -10,7 +10,9 @@ import { IngestStreamEffectiveLifecycle, IngestStreamLifecycle, IngestStreamUpsertRequest, + WiredReadStreamDefinition, WiredStreamGetResponse, + asIngestStreamGetResponse, isDslLifecycle, isIlmLifecycle, } from '@kbn/streams-schema'; @@ -34,7 +36,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ) { const definitions = await Promise.all(streams.map((stream) => getStream(apiClient, stream))); for (const definition of definitions) { - expect(definition.effective_lifecycle).to.eql(expectedLifecycle); + expect(asIngestStreamGetResponse(definition).effective_lifecycle).to.eql(expectedLifecycle); } const dataStreams = await esClient.indices.getDataStream({ name: streams }); @@ -102,10 +104,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(response).to.have.property('acknowledged', true); const updatedRootDefinition = await getStream(apiClient, 'logs'); - expect((updatedRootDefinition as WiredStreamGetResponse).stream.ingest.lifecycle).to.eql({ - dsl: { data_retention: '999d' }, - }); - expect(updatedRootDefinition.effective_lifecycle).to.eql({ + expect((updatedRootDefinition as WiredReadStreamDefinition).stream.ingest.lifecycle).to.eql( + { + dsl: { data_retention: '999d' }, + } + ); + expect((updatedRootDefinition as WiredReadStreamDefinition).effective_lifecycle).to.eql({ dsl: { data_retention: '999d' }, from: 'logs', });