Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ export class StorageIndexAdapter<TStorageSettings extends IndexStorageSettings,
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* 2.0.
*/

import { errors } from '@elastic/elasticsearch';
import { DiagnosticResult, errors } from '@elastic/elasticsearch';
import {
IndicesDataStream,
QueryDslQueryContainer,
Expand Down Expand Up @@ -50,7 +50,7 @@ import {
validateStreamLifecycle,
validateStreamTypeChanges,
} from './helpers/validate_stream';
import { rootStreamDefinition } from './root_stream_definition';
import { LOGS_ROOT_STREAM_NAME, rootStreamDefinition } from './root_stream_definition';
import { StreamsStorageClient } from './service';
import {
checkAccess,
Expand All @@ -64,6 +64,7 @@ import { DefinitionNotFoundError } from './errors/definition_not_found_error';
import { MalformedStreamIdError } from './errors/malformed_stream_id_error';
import { SecurityError } from './errors/security_error';
import { findInheritedLifecycle, findInheritingStreams } from './helpers/lifecycle';
import { NameTakenError } from './errors/name_taken_error';
import { MalformedStreamError } from './errors/malformed_stream_error';

interface AcknowledgeResponse<TResult extends Result> {
Expand All @@ -79,8 +80,6 @@ export type ForkStreamResponse = AcknowledgeResponse<'created'>;
export type ResyncStreamsResponse = AcknowledgeResponse<'updated'>;
export type UpsertStreamResponse = AcknowledgeResponse<'updated' | 'created'>;

const LOGS_ROOT_STREAM_NAME = 'logs';

function isElasticsearch404(error: unknown): error is errors.ResponseError & { statusCode: 404 } {
return isResponseError(error) && error.statusCode === 404;
}
Expand Down Expand Up @@ -311,6 +310,10 @@ export class StreamsClient {
result: 'created' | 'updated';
parentDefinition?: WiredStreamDefinition;
}> {
if (isWiredStreamDefinition(definition)) {
await this.assertNoHierarchicalConflicts(definition.name);
}

const existingDefinition = await this.getStream(definition.name).catch((error) => {
if (isDefinitionNotFoundError(error)) {
return undefined;
Expand Down Expand Up @@ -374,6 +377,47 @@ export class StreamsClient {
};
}

private async assertNoHierarchicalConflicts(definitionName: string) {
const streamNames = [...getAncestors(definitionName), definitionName];
const hasConflict = await Promise.all(
streamNames.map((streamName) => this.isStreamNameTaken(streamName))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small note: If we used arrow functions to define our methods we could write this simply as streamNames.map(this.isStreamNameTaken) since the context would already be bound.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm always in favour of referencing functions vs inline callbacks, I wouldn't mind if you change the method declaration to an arrow function.

);
const conflicts = streamNames.filter((_, index) => hasConflict[index]);

if (conflicts.length !== 0) {
throw new NameTakenError(
`Cannot create stream "${definitionName}" due to hierarchical conflicts caused by existing unwired stream definition, index or data stream: [${conflicts.join(
', '
)}]`
);
}
}

private async isStreamNameTaken(streamName: string): Promise<boolean> {
try {
const definition = await this.getStream(streamName);
return isUnwiredStreamDefinition(definition);
} catch (error) {
if (!isDefinitionNotFoundError(error)) {
throw error;
}
}

try {
await this.dependencies.scopedClusterClient.asCurrentUser.indices.get({
index: streamName,
});
Comment on lines +401 to +409
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super happy with the side effect driven flow here but flipping it just feels so verbose.


return true;
} catch (error) {
if (isElasticsearch404(error)) {
return false;
}

throw error;
}
}

/**
* Validates whether:
* - there are no conflicting field types,
Expand Down Expand Up @@ -621,6 +665,22 @@ export class StreamsClient {
return this.dependencies.scopedClusterClient.asCurrentUser.indices
.getDataStream({ name })
.then((response) => {
if (response.data_streams.length === 0) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the original error came from, if there is no data stream this API does not throw any error, it simply returns an empty list.

Which caused the flow to continue to try to translate this undefined object into a Classic stream definition hence causing the undefined.name error.

throw new errors.ResponseError({
meta: {
aborted: false,
attempts: 1,
connection: null,
context: null,
name: 'resource_not_found_exception',
request: {} as unknown as DiagnosticResult['meta']['request'],
},
warnings: [],
body: 'resource_not_found_exception',
statusCode: 404,
});
}

const dataStream = response.data_streams[0];
return dataStream;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from './status_error';

export class NameTakenError extends StatusError {
constructor(message: string) {
super(message, 409);
this.name = 'NameTakenError';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
* 2.0.
*/

import { WiredStreamDefinition } from '@kbn/streams-schema';
import { WiredStreamDefinition, getSegments } from '@kbn/streams-schema';

export const LOGS_ROOT_STREAM_NAME = 'logs';

export const rootStreamDefinition: WiredStreamDefinition = {
name: 'logs',
name: LOGS_ROOT_STREAM_NAME,
ingest: {
lifecycle: { dsl: {} },
processing: [],
Expand All @@ -34,3 +36,8 @@ export const rootStreamDefinition: WiredStreamDefinition = {
},
},
};

export function hasSupportedStreamsRoot(streamName: string) {
const root = getSegments(streamName)[0];
return [LOGS_ROOT_STREAM_NAME].includes(root);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import { createServerRouteFactory } from '@kbn/server-route-repository';
import { CreateServerRouteFactory } from '@kbn/server-route-repository-utils/src/typings';
import { badRequest, forbidden, internal, notFound } from '@hapi/boom';
import { badRequest, conflict, forbidden, internal, notFound } from '@hapi/boom';
import { errors } from '@elastic/elasticsearch';
import { StreamsRouteHandlerResources } from './types';
import { StatusError } from '../lib/streams/errors/status_error';
Expand All @@ -33,6 +33,9 @@ export const createServerRoute: CreateServerRouteFactory<
case 404:
throw notFound(error);

case 409:
throw conflict(error);

case 500:
throw internal(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import {
isGroupStreamDefinition,
StreamDefinition,
StreamGetResponse,
isWiredStreamDefinition,
streamUpsertRequestSchema,
} from '@kbn/streams-schema';
import { z } from '@kbn/zod';
import { badData, badRequest } from '@hapi/boom';
import { hasSupportedStreamsRoot } from '../../../lib/streams/root_stream_definition';
import { UpsertStreamResponse } from '../../../lib/streams/client';
import { createServerRoute } from '../../create_server_route';
import { readStream } from './read_stream';
Expand Down Expand Up @@ -148,6 +151,18 @@ export const editStreamRoute = createServerRoute({
}),
handler: async ({ params, request, getScopedClients }): Promise<UpsertStreamResponse> => {
const { streamsClient } = await getScopedClients({ request });

if (!(await streamsClient.isStreamsEnabled())) {
throw badData('Streams are not enabled');
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

badData results in a HTTP 422, since the request is syntactically correct and would pass our validation but there is a semantic error of streams not being enabled yet.

}

if (
isWiredStreamDefinition({ ...params.body.stream, name: params.path.id }) &&
!hasSupportedStreamsRoot(params.path.id)
) {
throw badRequest('Cannot create wired stream due to unsupported root stream');
}

return await streamsClient.upsertStream({
request: params.body,
name: params.path.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
*/

import { z } from '@kbn/zod';
import { conflict } from '@hapi/boom';
import { NameTakenError } from '../../../lib/streams/errors/name_taken_error';
import { DisableStreamsResponse, EnableStreamsResponse } from '../../../lib/streams/client';
import { createServerRoute } from '../../create_server_route';

Expand All @@ -27,7 +29,15 @@ export const enableStreamsRoute = createServerRoute({
request,
});

return await streamsClient.enableStreams();
try {
return await streamsClient.enableStreams();
} catch (error) {
if (error instanceof NameTakenError) {
throw conflict(`Cannot enable Streams, failed to create root stream: ${error.message}`);
}

throw error;
}
},
});

Expand Down