diff --git a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts index 233f6797c9092..3e5b5b9a1ae18 100644 --- a/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts +++ b/x-pack/solutions/observability/packages/utils_server/es/storage/index_adapter/index.ts @@ -554,6 +554,7 @@ export class StorageIndexAdapter { @@ -79,8 +80,6 @@ export type ForkStreamResponse = AcknowledgeResponse<'created'>; export type ResyncStreamsResponse = AcknowledgeResponse<'updated'>; export type UpsertStreamResponse = AcknowledgeResponse<'updated' | 'created'>; -const LOGS_ROOT_STREAM_NAME = 'logs'; - function isElasticsearch404(error: unknown): error is errors.ResponseError & { statusCode: 404 } { return isResponseError(error) && error.statusCode === 404; } @@ -311,6 +310,10 @@ export class StreamsClient { result: 'created' | 'updated'; parentDefinition?: WiredStreamDefinition; }> { + if (isWiredStreamDefinition(definition)) { + await this.assertNoHierarchicalConflicts(definition.name); + } + const existingDefinition = await this.getStream(definition.name).catch((error) => { if (isDefinitionNotFoundError(error)) { return undefined; @@ -374,6 +377,47 @@ export class StreamsClient { }; } + private async assertNoHierarchicalConflicts(definitionName: string) { + const streamNames = [...getAncestors(definitionName), definitionName]; + const hasConflict = await Promise.all( + streamNames.map((streamName) => this.isStreamNameTaken(streamName)) + ); + const conflicts = streamNames.filter((_, index) => hasConflict[index]); + + if (conflicts.length !== 0) { + throw new NameTakenError( + `Cannot create stream "${definitionName}" due to hierarchical conflicts caused by existing unwired stream definition, index or data stream: [${conflicts.join( + ', ' + )}]` + ); + } + } + + private async isStreamNameTaken(streamName: string): Promise { + try { + const definition = await this.getStream(streamName); + return isUnwiredStreamDefinition(definition); + } catch (error) { + if (!isDefinitionNotFoundError(error)) { + throw error; + } + } + + try { + await this.dependencies.scopedClusterClient.asCurrentUser.indices.get({ + index: streamName, + }); + + return true; + } catch (error) { + if (isElasticsearch404(error)) { + return false; + } + + throw error; + } + } + /** * Validates whether: * - there are no conflicting field types, @@ -621,6 +665,22 @@ export class StreamsClient { return this.dependencies.scopedClusterClient.asCurrentUser.indices .getDataStream({ name }) .then((response) => { + if (response.data_streams.length === 0) { + throw new errors.ResponseError({ + meta: { + aborted: false, + attempts: 1, + connection: null, + context: null, + name: 'resource_not_found_exception', + request: {} as unknown as DiagnosticResult['meta']['request'], + }, + warnings: [], + body: 'resource_not_found_exception', + statusCode: 404, + }); + } + const dataStream = response.data_streams[0]; return dataStream; }); diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/name_taken_error.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/name_taken_error.ts new file mode 100644 index 0000000000000..d377c999fd7d3 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/name_taken_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class NameTakenError extends StatusError { + constructor(message: string) { + super(message, 409); + this.name = 'NameTakenError'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts index 17ae7ae7f5049..5701d0f4f8a5e 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts @@ -5,10 +5,12 @@ * 2.0. */ -import { WiredStreamDefinition } from '@kbn/streams-schema'; +import { WiredStreamDefinition, getSegments } from '@kbn/streams-schema'; + +export const LOGS_ROOT_STREAM_NAME = 'logs'; export const rootStreamDefinition: WiredStreamDefinition = { - name: 'logs', + name: LOGS_ROOT_STREAM_NAME, ingest: { lifecycle: { dsl: {} }, processing: [], @@ -34,3 +36,8 @@ export const rootStreamDefinition: WiredStreamDefinition = { }, }, }; + +export function hasSupportedStreamsRoot(streamName: string) { + const root = getSegments(streamName)[0]; + return [LOGS_ROOT_STREAM_NAME].includes(root); +} diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts index 83eec795f015d..de74e7e39083a 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/create_server_route.ts @@ -7,7 +7,7 @@ import { createServerRouteFactory } from '@kbn/server-route-repository'; import { CreateServerRouteFactory } from '@kbn/server-route-repository-utils/src/typings'; -import { badRequest, forbidden, internal, notFound } from '@hapi/boom'; +import { badRequest, conflict, forbidden, internal, notFound } from '@hapi/boom'; import { errors } from '@elastic/elasticsearch'; import { StreamsRouteHandlerResources } from './types'; import { StatusError } from '../lib/streams/errors/status_error'; @@ -33,6 +33,9 @@ export const createServerRoute: CreateServerRouteFactory< case 404: throw notFound(error); + case 409: + throw conflict(error); + case 500: throw internal(error); } diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts index c8a95e3f23206..f711ebd2992ee 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/crud/route.ts @@ -10,9 +10,12 @@ import { isGroupStreamDefinition, StreamDefinition, StreamGetResponse, + isWiredStreamDefinition, streamUpsertRequestSchema, } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; +import { badData, badRequest } from '@hapi/boom'; +import { hasSupportedStreamsRoot } from '../../../lib/streams/root_stream_definition'; import { UpsertStreamResponse } from '../../../lib/streams/client'; import { createServerRoute } from '../../create_server_route'; import { readStream } from './read_stream'; @@ -148,6 +151,18 @@ export const editStreamRoute = createServerRoute({ }), handler: async ({ params, request, getScopedClients }): Promise => { const { streamsClient } = await getScopedClients({ request }); + + if (!(await streamsClient.isStreamsEnabled())) { + throw badData('Streams are not enabled'); + } + + if ( + isWiredStreamDefinition({ ...params.body.stream, name: params.path.id }) && + !hasSupportedStreamsRoot(params.path.id) + ) { + throw badRequest('Cannot create wired stream due to unsupported root stream'); + } + return await streamsClient.upsertStream({ request: params.body, name: params.path.id, diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts index 7eafbeb4e7056..8972b35389365 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enablement/route.ts @@ -6,6 +6,8 @@ */ import { z } from '@kbn/zod'; +import { conflict } from '@hapi/boom'; +import { NameTakenError } from '../../../lib/streams/errors/name_taken_error'; import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client'; import { createServerRoute } from '../../create_server_route'; @@ -27,7 +29,15 @@ export const enableStreamsRoute = createServerRoute({ request, }); - return await streamsClient.enableStreams(); + try { + return await streamsClient.enableStreams(); + } catch (error) { + if (error instanceof NameTakenError) { + throw conflict(`Cannot enable Streams, failed to create root stream: ${error.message}`); + } + + throw error; + } }, });