diff --git a/docs/developer/plugin-list.asciidoc b/docs/developer/plugin-list.asciidoc index 87f74ebdb7bb1..ea84b25dcda4d 100644 --- a/docs/developer/plugin-list.asciidoc +++ b/docs/developer/plugin-list.asciidoc @@ -909,6 +909,10 @@ routes, etc. |The stack_connectors plugin provides connector types shipped with Kibana, built on top of the framework provided in the actions plugin. +|{kib-repo}blob/{branch}/x-pack/plugins/streams/README.md[streams] +|This plugin provides an interface to manage streams + + |{kib-repo}blob/{branch}/x-pack/plugins/observability_solution/synthetics/README.md[synthetics] |The purpose of this plugin is to provide users of Heartbeat more visibility of what's happening in their infrastructure. diff --git a/package.json b/package.json index b208a28cdb0d2..a1bf3c22f6cd4 100644 --- a/package.json +++ b/package.json @@ -934,6 +934,7 @@ "@kbn/status-plugin-a-plugin": "link:test/server_integration/plugins/status_plugin_a", "@kbn/status-plugin-b-plugin": "link:test/server_integration/plugins/status_plugin_b", "@kbn/std": "link:packages/kbn-std", + "@kbn/streams-plugin": "link:x-pack/plugins/streams", "@kbn/synthetics-plugin": "link:x-pack/plugins/observability_solution/synthetics", "@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location", "@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture", diff --git a/packages/kbn-optimizer/limits.yml b/packages/kbn-optimizer/limits.yml index 533e632813960..6bbef17fe6e6c 100644 --- a/packages/kbn-optimizer/limits.yml +++ b/packages/kbn-optimizer/limits.yml @@ -161,6 +161,7 @@ pageLoadAssetSize: spaces: 57868 stackAlerts: 58316 stackConnectors: 67227 + streams: 16742 synthetics: 55971 telemetry: 51957 telemetryManagementSection: 38586 diff --git a/tsconfig.base.json b/tsconfig.base.json index 3880224e8601c..9b72acee0fcd0 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1844,6 +1844,8 @@ "@kbn/stdio-dev-helpers/*": ["packages/kbn-stdio-dev-helpers/*"], "@kbn/storybook": ["packages/kbn-storybook"], "@kbn/storybook/*": ["packages/kbn-storybook/*"], + "@kbn/streams-plugin": ["x-pack/plugins/streams"], + "@kbn/streams-plugin/*": ["x-pack/plugins/streams/*"], "@kbn/synthetics-e2e": ["x-pack/plugins/observability_solution/synthetics/e2e"], "@kbn/synthetics-e2e/*": ["x-pack/plugins/observability_solution/synthetics/e2e/*"], "@kbn/synthetics-plugin": ["x-pack/plugins/observability_solution/synthetics"], diff --git a/x-pack/plugins/streams/README.md b/x-pack/plugins/streams/README.md new file mode 100644 index 0000000000000..9a3539a33535d --- /dev/null +++ b/x-pack/plugins/streams/README.md @@ -0,0 +1,3 @@ +# Streams Plugin + +This plugin provides an interface to manage streams \ No newline at end of file diff --git a/x-pack/plugins/streams/common/config.ts b/x-pack/plugins/streams/common/config.ts new file mode 100644 index 0000000000000..3371b1b6d8cbc --- /dev/null +++ b/x-pack/plugins/streams/common/config.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const configSchema = schema.object({}); + +export type StreamsConfig = TypeOf; + +/** + * The following map is passed to the server plugin setup under the + * exposeToBrowser: option, and controls which of the above config + * keys are allow-listed to be available in the browser config. + * + * NOTE: anything exposed here will be visible in the UI dev tools, + * and therefore MUST NOT be anything that is sensitive information! + */ +export const exposeToBrowserConfig = {} as const; + +type ValidKeys = keyof { + [K in keyof typeof exposeToBrowserConfig as (typeof exposeToBrowserConfig)[K] extends true + ? K + : never]: true; +}; + +export type StreamsPublicConfig = Pick; diff --git a/x-pack/plugins/streams/common/constants.ts b/x-pack/plugins/streams/common/constants.ts new file mode 100644 index 0000000000000..d7595990ded6e --- /dev/null +++ b/x-pack/plugins/streams/common/constants.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const ASSET_VERSION = 1; +export const STREAMS_INDEX = '.kibana_streams'; diff --git a/x-pack/plugins/streams/common/types.ts b/x-pack/plugins/streams/common/types.ts new file mode 100644 index 0000000000000..6cdb2f923f6f4 --- /dev/null +++ b/x-pack/plugins/streams/common/types.ts @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; + +const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]); + +export const filterConditionSchema = z.object({ + field: z.string(), + operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']), + value: stringOrNumberOrBoolean, +}); + +export type FilterCondition = z.infer; + +export interface AndCondition { + and: Condition[]; +} + +export interface RerouteOrCondition { + or: Condition[]; +} + +export type Condition = FilterCondition | AndCondition | RerouteOrCondition | undefined; + +export const conditionSchema: z.ZodType = z.lazy(() => + z.union([ + filterConditionSchema, + z.object({ and: z.array(conditionSchema) }), + z.object({ or: z.array(conditionSchema) }), + ]) +); + +export const grokProcessingDefinitionSchema = z.object({ + type: z.literal('grok'), + field: z.string(), + patterns: z.array(z.string()), + pattern_definitions: z.optional(z.record(z.string())), +}); + +export const dissectProcessingDefinitionSchema = z.object({ + type: z.literal('dissect'), + field: z.string(), + pattern: z.string(), +}); + +export const processingDefinitionSchema = z.object({ + condition: z.optional(conditionSchema), + config: z.discriminatedUnion('type', [ + grokProcessingDefinitionSchema, + dissectProcessingDefinitionSchema, + ]), +}); + +export type ProcessingDefinition = z.infer; + +export const fieldDefinitionSchema = z.object({ + name: z.string(), + type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']), +}); + +export type FieldDefinition = z.infer; + +export const streamWithoutIdDefinitonSchema = z.object({ + processing: z.array(processingDefinitionSchema).default([]), + fields: z.array(fieldDefinitionSchema).default([]), + children: z + .array( + z.object({ + id: z.string(), + condition: conditionSchema, + }) + ) + .default([]), +}); + +export type StreamWithoutIdDefinition = z.infer; + +export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ + id: z.string(), +}); + +export type StreamDefinition = z.infer; + +export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true }); + +export type StreamWithoutChildrenDefinition = z.infer; diff --git a/x-pack/plugins/streams/jest.config.js b/x-pack/plugins/streams/jest.config.js new file mode 100644 index 0000000000000..43d4fd28da9b5 --- /dev/null +++ b/x-pack/plugins/streams/jest.config.js @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +module.exports = { + preset: '@kbn/test', + rootDir: '../../..', + roots: ['/x-pack/plugins/streams'], + coverageDirectory: '/target/kibana-coverage/jest/x-pack/plugins/streams', + coverageReporters: ['text', 'html'], + collectCoverageFrom: ['/x-pack/plugins/streams/{common,public,server}/**/*.{js,ts,tsx}'], +}; diff --git a/x-pack/plugins/streams/kibana.jsonc b/x-pack/plugins/streams/kibana.jsonc new file mode 100644 index 0000000000000..06c37ed245cf1 --- /dev/null +++ b/x-pack/plugins/streams/kibana.jsonc @@ -0,0 +1,28 @@ +{ + "type": "plugin", + "id": "@kbn/streams-plugin", + "owner": "@simianhacker @flash1293 @dgieselaar", + "description": "A manager for Streams", + "group": "observability", + "visibility": "private", + "plugin": { + "id": "streams", + "configPath": ["xpack", "streams"], + "browser": true, + "server": true, + "requiredPlugins": [ + "data", + "security", + "encryptedSavedObjects", + "usageCollection", + "licensing", + "taskManager" + ], + "optionalPlugins": [ + "cloud", + "serverless" + ], + "requiredBundles": [ + ] + } +} diff --git a/x-pack/plugins/streams/public/index.ts b/x-pack/plugins/streams/public/index.ts new file mode 100644 index 0000000000000..5b83ea1d297d3 --- /dev/null +++ b/x-pack/plugins/streams/public/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PluginInitializer, PluginInitializerContext } from '@kbn/core/public'; +import { Plugin } from './plugin'; + +export const plugin: PluginInitializer<{}, {}> = (context: PluginInitializerContext) => { + return new Plugin(context); +}; diff --git a/x-pack/plugins/streams/public/plugin.ts b/x-pack/plugins/streams/public/plugin.ts new file mode 100644 index 0000000000000..f35d18e06ff70 --- /dev/null +++ b/x-pack/plugins/streams/public/plugin.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreSetup, CoreStart, PluginInitializerContext } from '@kbn/core/public'; +import { Logger } from '@kbn/logging'; + +import type { StreamsPublicConfig } from '../common/config'; +import { StreamsPluginClass, StreamsPluginSetup, StreamsPluginStart } from './types'; + +export class Plugin implements StreamsPluginClass { + public config: StreamsPublicConfig; + public logger: Logger; + + constructor(context: PluginInitializerContext<{}>) { + this.config = context.config.get(); + this.logger = context.logger.get(); + } + + setup(core: CoreSetup, pluginSetup: StreamsPluginSetup) { + return {}; + } + + start(core: CoreStart) { + return {}; + } + + stop() {} +} diff --git a/x-pack/plugins/streams/public/types.ts b/x-pack/plugins/streams/public/types.ts new file mode 100644 index 0000000000000..61e5fa94098f0 --- /dev/null +++ b/x-pack/plugins/streams/public/types.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Plugin as PluginClass } from '@kbn/core/public'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginSetup {} + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginStart {} + +export type StreamsPluginClass = PluginClass<{}, {}, StreamsPluginSetup, StreamsPluginStart>; diff --git a/x-pack/plugins/streams/server/index.ts b/x-pack/plugins/streams/server/index.ts new file mode 100644 index 0000000000000..bd8aee304ad15 --- /dev/null +++ b/x-pack/plugins/streams/server/index.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PluginInitializerContext } from '@kbn/core-plugins-server'; +import { StreamsConfig } from '../common/config'; +import { StreamsPluginSetup, StreamsPluginStart, config } from './plugin'; +import { StreamsRouteRepository } from './routes'; + +export type { StreamsConfig, StreamsPluginSetup, StreamsPluginStart, StreamsRouteRepository }; +export { config }; + +export const plugin = async (context: PluginInitializerContext) => { + const { StreamsPlugin } = await import('./plugin'); + return new StreamsPlugin(context); +}; diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts new file mode 100644 index 0000000000000..82c89c9ab9171 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ClusterPutComponentTemplateRequest, + MappingProperty, +} from '@elastic/elasticsearch/lib/api/types'; +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { logsSettings } from './logs_layer'; +import { isRoot } from '../helpers/hierarchy'; +import { getComponentTemplateName } from './name'; + +export function generateLayer( + id: string, + definition: StreamDefinition +): ClusterPutComponentTemplateRequest { + const properties: Record = {}; + definition.fields.forEach((field) => { + properties[field.name] = { + type: field.type, + }; + }); + return { + name: getComponentTemplateName(id), + template: { + settings: isRoot(definition.id) ? logsSettings : {}, + mappings: { + subobjects: false, + properties, + }, + }, + version: ASSET_VERSION, + _meta: { + managed: true, + description: `Default settings for the ${id} stream`, + }, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts new file mode 100644 index 0000000000000..6b41d04131c56 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types'; + +export const logsSettings: IndicesIndexSettings = { + index: { + lifecycle: { + name: 'logs', + }, + codec: 'best_compression', + mapping: { + total_fields: { + ignore_dynamic_beyond_limit: true, + }, + ignore_malformed: true, + }, + }, +}; diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts new file mode 100644 index 0000000000000..a7d707a4ce42a --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { ClusterPutComponentTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DeleteComponentOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface ComponentManagementOptions { + esClient: ElasticsearchClient; + component: ClusterPutComponentTemplateRequest; + logger: Logger; +} + +export async function deleteComponent({ esClient, name, logger }: DeleteComponentOptions) { + try { + await retryTransientEsErrors( + () => esClient.cluster.deleteComponentTemplate({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting component template: ${error.message}`); + throw error; + } +} + +export async function upsertComponent({ esClient, component, logger }: ComponentManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.cluster.putComponentTemplate(component), { + logger, + }); + logger.debug(() => `Installed component template: ${JSON.stringify(component)}`); + } catch (error: any) { + logger.error(`Error updating component template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts new file mode 100644 index 0000000000000..6ea05b9a53b28 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getComponentTemplateName(id: string) { + return `${id}@stream.layer`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts new file mode 100644 index 0000000000000..812739db56c73 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DataStreamManagementOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface DeleteDataStreamOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface RolloverDataStreamOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +export async function upsertDataStream({ esClient, name, logger }: DataStreamManagementOptions) { + const dataStreamExists = await esClient.indices.exists({ index: name }); + if (dataStreamExists) { + return; + } + try { + await retryTransientEsErrors(() => esClient.indices.createDataStream({ name }), { logger }); + logger.debug(() => `Installed data stream: ${name}`); + } catch (error: any) { + logger.error(`Error creating data stream: ${error.message}`); + throw error; + } +} + +export async function deleteDataStream({ esClient, name, logger }: DeleteDataStreamOptions) { + try { + await retryTransientEsErrors( + () => esClient.indices.deleteDataStream({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting data stream: ${error.message}`); + throw error; + } +} + +export async function rolloverDataStreamIfNecessary({ + esClient, + name, + logger, +}: RolloverDataStreamOptions) { + const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` }); + for (const dataStream of dataStreams.data_streams) { + const currentMappings = + Object.values( + await esClient.indices.getMapping({ + index: dataStream.indices.at(-1)?.index_name, + }) + )[0].mappings.properties || {}; + const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name }); + const simulatedMappings = simulatedIndex.template.mappings.properties || {}; + + // check whether the same fields and same types are listed (don't check for other mapping attributes) + const isDifferent = + Object.values(simulatedMappings).length !== Object.values(currentMappings).length || + Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => { + const currentType = currentMappings[fieldName]?.type; + return currentType !== type; + }); + + if (!isDifferent) { + continue; + } + + try { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { + logger, + }); + logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + } catch (error: any) { + logger.error(`Error rolling over data stream: ${error.message}`); + throw error; + } + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts new file mode 100644 index 0000000000000..a7e9cebf98507 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class ComponentTemplateNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'ComponentTemplateNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts b/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts new file mode 100644 index 0000000000000..817e8f67bf25d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class DefinitionIdInvalid extends Error { + constructor(message: string) { + super(message); + this.name = 'DefinitionIdInvalid'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts new file mode 100644 index 0000000000000..f7e60193baa5f --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class DefinitionNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'DefinitionNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts b/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts new file mode 100644 index 0000000000000..713751dbe4363 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class ForkConditionMissing extends Error { + constructor(message: string) { + super(message); + this.name = 'ForkConditionMissing'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts b/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts new file mode 100644 index 0000000000000..a24c7357379fa --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IdConflict extends Error { + constructor(message: string) { + super(message); + this.name = 'IdConflict'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/index.ts b/x-pack/plugins/streams/server/lib/streams/errors/index.ts new file mode 100644 index 0000000000000..73842ef3018fe --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/index.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './definition_id_invalid'; +export * from './definition_not_found'; +export * from './id_conflict_error'; +export * from './permission_denied'; +export * from './security_exception'; +export * from './index_template_not_found'; +export * from './fork_condition_missing'; +export * from './component_template_not_found'; diff --git a/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts new file mode 100644 index 0000000000000..4f4735dd15fa1 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IndexTemplateNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'IndexTemplateNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts new file mode 100644 index 0000000000000..8bf9bbd4933ce --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IngestPipelineNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'IngestPipelineNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts new file mode 100644 index 0000000000000..699c4cdd5b1ef --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedChildren extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedChildren'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts new file mode 100644 index 0000000000000..b8f7ac1392610 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedFields extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedFields'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts new file mode 100644 index 0000000000000..2f988204c74b0 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedStreamId extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedStreamId'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts b/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts new file mode 100644 index 0000000000000..f0133e28063ca --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class PermissionDenied extends Error { + constructor(message: string) { + super(message); + this.name = 'PermissionDenied'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts b/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts new file mode 100644 index 0000000000000..0b4ae450c2530 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class SecurityException extends Error { + constructor(message: string) { + super(message); + this.name = 'SecurityException'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts new file mode 100644 index 0000000000000..aab7f27f12d14 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { conditionToPainless } from './condition_to_painless'; + +const operatorConditionAndResutls = [ + { + condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + result: 'ctx.log?.logger == "nginx_proxy"', + }, + { + condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' }, + result: 'ctx.log?.logger != "nginx_proxy"', + }, + { + condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 }, + result: 'ctx.http?.response?.status_code < 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 }, + result: 'ctx.http?.response?.status_code <= 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 }, + result: 'ctx.http?.response?.status_code > 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 }, + result: 'ctx.http?.response?.status_code >= 500', + }, + { + condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' }, + result: 'ctx.log?.logger.startsWith("nginx")', + }, + { + condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' }, + result: 'ctx.log?.logger.endsWith("proxy")', + }, + { + condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' }, + result: 'ctx.log?.logger.contains("proxy")', + }, +]; + +describe('conditionToPainless', () => { + describe('operators', () => { + operatorConditionAndResutls.forEach((setup) => { + test(`${setup.condition.operator}`, () => { + expect(conditionToPainless(setup.condition)).toEqual(setup.result); + }); + }); + }); + + describe('and', () => { + test('simple', () => { + const condition = { + and: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" && ctx.log?.level == "error"' + ) + ); + }); + }); + + describe('or', () => { + test('simple', () => { + const condition = { + or: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" || ctx.log?.level == "error"' + ) + ); + }); + }); + + describe('nested', () => { + test('and with a filter and or with 2 filters', () => { + const condition = { + and: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { + or: [ + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + { field: 'log.level', operator: 'eq' as const, value: 'ERROR' }, + ], + }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + ) + ); + }); + test('and with 2 or with filters', () => { + const condition = { + and: [ + { + or: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'service.name', operator: 'eq' as const, value: 'nginx' }, + ], + }, + { + or: [ + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + { field: 'log.level', operator: 'eq' as const, value: 'ERROR' }, + ], + }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + '(ctx.log?.logger == "nginx_proxy" || ctx.service?.name == "nginx") && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + ) + ); + }); + }); +}); diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts new file mode 100644 index 0000000000000..539ad3603535b --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isBoolean, isString } from 'lodash'; +import { + AndCondition, + Condition, + conditionSchema, + FilterCondition, + filterConditionSchema, + RerouteOrCondition, +} from '../../../../common/types'; + +function isFilterCondition(subject: any): subject is FilterCondition { + const result = filterConditionSchema.safeParse(subject); + return result.success; +} + +function isAndCondition(subject: any): subject is AndCondition { + const result = conditionSchema.safeParse(subject); + return result.success && subject.and != null; +} + +function isOrCondition(subject: any): subject is RerouteOrCondition { + const result = conditionSchema.safeParse(subject); + return result.success && subject.or != null; +} + +function safePainlessField(condition: FilterCondition) { + return `ctx.${condition.field.split('.').join('?.')}`; +} + +function encodeValue(value: string | number | boolean) { + if (isString(value)) { + return `"${value}"`; + } + if (isBoolean(value)) { + return value ? 'true' : 'false'; + } + return value; +} + +function toPainless(condition: FilterCondition) { + switch (condition.operator) { + case 'neq': + return `${safePainlessField(condition)} != ${encodeValue(condition.value)}`; + case 'lt': + return `${safePainlessField(condition)} < ${encodeValue(condition.value)}`; + case 'lte': + return `${safePainlessField(condition)} <= ${encodeValue(condition.value)}`; + case 'gt': + return `${safePainlessField(condition)} > ${encodeValue(condition.value)}`; + case 'gte': + return `${safePainlessField(condition)} >= ${encodeValue(condition.value)}`; + case 'startsWith': + return `${safePainlessField(condition)}.startsWith(${encodeValue(condition.value)})`; + case 'endsWith': + return `${safePainlessField(condition)}.endsWith(${encodeValue(condition.value)})`; + case 'contains': + return `${safePainlessField(condition)}.contains(${encodeValue(condition.value)})`; + default: + return `${safePainlessField(condition)} == ${encodeValue(condition.value)}`; + } +} + +export function conditionToPainless(condition: Condition, nested = false): string { + if (isFilterCondition(condition)) { + return toPainless(condition); + } + if (isAndCondition(condition)) { + const and = condition.and.map((filter) => conditionToPainless(filter, true)).join(' && '); + return nested ? `(${and})` : and; + } + if (isOrCondition(condition)) { + const or = condition.or.map((filter) => conditionToPainless(filter, true)).join(' || '); + return nested ? `(${or})` : or; + } + return 'false'; +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts b/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts new file mode 100644 index 0000000000000..6f1cd308f3c3d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; + +export function isDescendandOf(parent: StreamDefinition, child: StreamDefinition) { + return child.id.startsWith(parent.id); +} + +export function isChildOf(parent: StreamDefinition, child: StreamDefinition) { + return ( + isDescendandOf(parent, child) && child.id.split('.').length === parent.id.split('.').length + 1 + ); +} + +export function getParentId(id: string) { + const parts = id.split('.'); + if (parts.length === 1) { + return undefined; + } + return parts.slice(0, parts.length - 1).join('.'); +} + +export function isRoot(id: string) { + return id.split('.').length === 1; +} + +export function getAncestors(id: string) { + const parts = id.split('.'); + return parts.slice(0, parts.length - 1).map((_, index) => parts.slice(0, index + 1).join('.')); +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts b/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts new file mode 100644 index 0000000000000..32604a22bf9be --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { setTimeout } from 'timers/promises'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import type { Logger } from '@kbn/logging'; +import { SecurityException } from '../errors'; + +const MAX_ATTEMPTS = 5; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 408, // RequestTimeout + 410, // Gone +]; + +const isRetryableError = (e: any) => + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + (e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!)); + +/** + * Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff. + * Should only be used to wrap operations that are idempotent and can be safely executed more than once. + */ +export const retryTransientEsErrors = async ( + esCall: () => Promise, + { logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {} +): Promise => { + try { + return await esCall(); + } catch (e) { + if (attempt < MAX_ATTEMPTS && isRetryableError(e)) { + const retryCount = attempt + 1; + const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ... + + logger?.warn( + `Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${ + e.stack + }` + ); + + await setTimeout(retryDelaySec * 1000); + return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); + } + + if (e.meta?.body?.error?.type === 'security_exception') { + throw new SecurityException(e.meta.body.error.reason); + } + + throw e; + } +}; diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts new file mode 100644 index 0000000000000..7a16534a618da --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ASSET_VERSION } from '../../../../common/constants'; +import { getProcessingPipelineName } from '../ingest_pipelines/name'; +import { getIndexTemplateName } from './name'; + +export function generateIndexTemplate(id: string) { + const composedOf = id.split('.').reduce((acc, _, index, array) => { + const parent = array.slice(0, index + 1).join('.'); + return [...acc, `${parent}@stream.layer`]; + }, [] as string[]); + + return { + name: getIndexTemplateName(id), + index_patterns: [id], + composed_of: composedOf, + priority: 200, + version: ASSET_VERSION, + _meta: { + managed: true, + description: `The index template for ${id} stream`, + }, + data_stream: { + hidden: false, + }, + template: { + settings: { + index: { + default_pipeline: getProcessingPipelineName(id), + }, + }, + }, + allow_auto_create: true, + // ignore missing component templates to be more robust against out-of-order syncs + ignore_missing_component_templates: composedOf, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts new file mode 100644 index 0000000000000..9383e698b3436 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface TemplateManagementOptions { + esClient: ElasticsearchClient; + template: IndicesPutIndexTemplateRequest; + logger: Logger; +} + +interface DeleteTemplateOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +export async function upsertTemplate({ esClient, template, logger }: TemplateManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.indices.putIndexTemplate(template), { logger }); + logger.debug(() => `Installed index template: ${JSON.stringify(template)}`); + } catch (error: any) { + logger.error(`Error updating index template: ${error.message}`); + throw error; + } +} + +export async function deleteTemplate({ esClient, name, logger }: DeleteTemplateOptions) { + try { + await retryTransientEsErrors( + () => esClient.indices.deleteIndexTemplate({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting index template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts new file mode 100644 index 0000000000000..ec8ea5519a6b4 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getIndexTemplateName(id: string) { + return `${id}@stream`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts new file mode 100644 index 0000000000000..eb09df8831304 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { conditionToPainless } from '../helpers/condition_to_painless'; +import { logsDefaultPipelineProcessors } from './logs_default_pipeline'; +import { isRoot } from '../helpers/hierarchy'; +import { getProcessingPipelineName } from './name'; + +export function generateIngestPipeline(id: string, definition: StreamDefinition) { + return { + id: getProcessingPipelineName(id), + processors: [ + ...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []), + ...definition.processing.map((processor) => { + const { type, ...config } = processor.config; + return { + [type]: { + ...config, + if: processor.condition ? conditionToPainless(processor.condition) : undefined, + }, + }; + }), + { + pipeline: { + name: `${id}@stream.reroutes`, + ignore_missing_pipeline: true, + }, + }, + ], + _meta: { + description: `Default pipeline for the ${id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts new file mode 100644 index 0000000000000..9b46e0cf4ac92 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { conditionToPainless } from '../helpers/condition_to_painless'; +import { getReroutePipelineName } from './name'; + +interface GenerateReroutePipelineParams { + definition: StreamDefinition; +} + +export async function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) { + return { + id: getReroutePipelineName(definition.id), + processors: definition.children.map((child) => { + return { + reroute: { + destination: child.id, + if: conditionToPainless(child.condition), + }, + }; + }), + _meta: { + description: `Reoute pipeline for the ${definition.id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts new file mode 100644 index 0000000000000..762155ba5047c --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const logsDefaultPipelineProcessors = [ + { + set: { + description: "If '@timestamp' is missing, set it with the ingest timestamp", + field: '@timestamp', + override: false, + copy_from: '_ingest.timestamp', + }, + }, + { + pipeline: { + name: 'logs@json-pipeline', + ignore_missing_pipeline: true, + }, + }, +]; diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts new file mode 100644 index 0000000000000..467e2efb48f0d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DeletePipelineOptions { + esClient: ElasticsearchClient; + id: string; + logger: Logger; +} + +interface PipelineManagementOptions { + esClient: ElasticsearchClient; + pipeline: IngestPutPipelineRequest; + logger: Logger; +} + +export async function deleteIngestPipeline({ esClient, id, logger }: DeletePipelineOptions) { + try { + await retryTransientEsErrors(() => esClient.ingest.deletePipeline({ id }, { ignore: [404] }), { + logger, + }); + } catch (error: any) { + logger.error(`Error deleting ingest pipeline: ${error.message}`); + throw error; + } +} + +export async function upsertIngestPipeline({ + esClient, + pipeline, + logger, +}: PipelineManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.ingest.putPipeline(pipeline), { logger }); + logger.debug(() => `Installed index template: ${JSON.stringify(pipeline)}`); + } catch (error: any) { + logger.error(`Error updating index template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts new file mode 100644 index 0000000000000..8d2a97ff3137f --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getProcessingPipelineName(id: string) { + return `${id}@stream.processing`; +} + +export function getReroutePipelineName(id: string) { + return `${id}@stream.reroutes`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts new file mode 100644 index 0000000000000..8e88eeef8cd84 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { STREAMS_INDEX } from '../../../common/constants'; + +export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) { + return scopedClusterClient.asInternalUser.indices.create({ + index: STREAMS_INDEX, + mappings: { + dynamic: 'strict', + properties: { + processing: { + type: 'object', + enabled: false, + }, + fields: { + type: 'object', + enabled: false, + }, + children: { + type: 'object', + enabled: false, + }, + id: { + type: 'keyword', + }, + }, + }, + }); +} diff --git a/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts new file mode 100644 index 0000000000000..2b7deed877309 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../common/types'; + +export const rootStreamDefinition: StreamDefinition = { + id: 'logs', + processing: [], + children: [], + fields: [ + { + name: '@timestamp', + type: 'date', + }, + { + name: 'message', + type: 'match_only_text', + }, + { + name: 'host.name', + type: 'keyword', + }, + { + name: 'log.level', + type: 'keyword', + }, + ], +}; diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts new file mode 100644 index 0000000000000..78a126905d9a4 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -0,0 +1,286 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { FieldDefinition, StreamDefinition } from '../../../common/types'; +import { STREAMS_INDEX } from '../../../common/constants'; +import { DefinitionNotFound } from './errors'; +import { deleteTemplate, upsertTemplate } from './index_templates/manage_index_templates'; +import { generateLayer } from './component_templates/generate_layer'; +import { generateIngestPipeline } from './ingest_pipelines/generate_ingest_pipeline'; +import { generateReroutePipeline } from './ingest_pipelines/generate_reroute_pipeline'; +import { generateIndexTemplate } from './index_templates/generate_index_template'; +import { deleteComponent, upsertComponent } from './component_templates/manage_component_templates'; +import { getIndexTemplateName } from './index_templates/name'; +import { getComponentTemplateName } from './component_templates/name'; +import { getProcessingPipelineName, getReroutePipelineName } from './ingest_pipelines/name'; +import { + deleteIngestPipeline, + upsertIngestPipeline, +} from './ingest_pipelines/manage_ingest_pipelines'; +import { getAncestors } from './helpers/hierarchy'; +import { MalformedFields } from './errors/malformed_fields'; +import { + deleteDataStream, + rolloverDataStreamIfNecessary, + upsertDataStream, +} from './data_streams/manage_data_streams'; + +interface BaseParams { + scopedClusterClient: IScopedClusterClient; +} + +interface BaseParamsWithDefinition extends BaseParams { + definition: StreamDefinition; +} + +interface DeleteStreamParams extends BaseParams { + id: string; + logger: Logger; +} + +export async function deleteStreamObjects({ id, scopedClusterClient, logger }: DeleteStreamParams) { + await deleteDataStream({ + esClient: scopedClusterClient.asCurrentUser, + name: id, + logger, + }); + await deleteTemplate({ + esClient: scopedClusterClient.asCurrentUser, + name: getIndexTemplateName(id), + logger, + }); + await deleteComponent({ + esClient: scopedClusterClient.asCurrentUser, + name: getComponentTemplateName(id), + logger, + }); + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getProcessingPipelineName(id), + logger, + }); + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getReroutePipelineName(id), + logger, + }); + await scopedClusterClient.asInternalUser.delete({ + id, + index: STREAMS_INDEX, + refresh: 'wait_for', + }); +} + +async function upsertInternalStream({ definition, scopedClusterClient }: BaseParamsWithDefinition) { + return scopedClusterClient.asInternalUser.index({ + id: definition.id, + index: STREAMS_INDEX, + document: definition, + refresh: 'wait_for', + }); +} + +type ListStreamsParams = BaseParams; + +export async function listStreams({ scopedClusterClient }: ListStreamsParams) { + const response = await scopedClusterClient.asInternalUser.search({ + index: STREAMS_INDEX, + size: 10000, + fields: ['id'], + _source: false, + sort: [{ id: 'asc' }], + }); + const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] }); + return definitions; +} + +interface ReadStreamParams extends BaseParams { + id: string; +} + +export async function readStream({ id, scopedClusterClient }: ReadStreamParams) { + try { + const response = await scopedClusterClient.asInternalUser.get({ + id, + index: STREAMS_INDEX, + }); + const definition = response._source as StreamDefinition; + return { + definition, + }; + } catch (e) { + if (e.meta?.statusCode === 404) { + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + } + throw e; + } +} + +interface ReadAncestorsParams extends BaseParams { + id: string; +} + +export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsParams) { + const ancestorIds = getAncestors(id); + + return await Promise.all( + ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId })) + ); +} + +interface ReadDescendantsParams extends BaseParams { + id: string; +} + +export async function readDescendants({ id, scopedClusterClient }: ReadDescendantsParams) { + const response = await scopedClusterClient.asInternalUser.search({ + index: STREAMS_INDEX, + size: 10000, + body: { + query: { + bool: { + filter: { + prefix: { + id, + }, + }, + must_not: { + term: { + id, + }, + }, + }, + }, + }, + }); + return response.hits.hits.map((hit) => hit._source as StreamDefinition); +} + +export async function validateAncestorFields( + scopedClusterClient: IScopedClusterClient, + id: string, + fields: FieldDefinition[] +) { + const ancestors = await readAncestors({ + id, + scopedClusterClient, + }); + for (const ancestor of ancestors) { + for (const field of fields) { + if ( + ancestor.definition.fields.some( + (ancestorField) => ancestorField.type !== field.type && ancestorField.name === field.name + ) + ) { + throw new MalformedFields( + `Field ${field.name} is already defined with incompatible type in the parent stream ${ancestor.definition.id}` + ); + } + } + } +} + +export async function validateDescendantFields( + scopedClusterClient: IScopedClusterClient, + id: string, + fields: FieldDefinition[] +) { + const descendants = await readDescendants({ + id, + scopedClusterClient, + }); + for (const descendant of descendants) { + for (const field of fields) { + if ( + descendant.fields.some( + (descendantField) => + descendantField.type !== field.type && descendantField.name === field.name + ) + ) { + throw new MalformedFields( + `Field ${field.name} is already defined with incompatible type in the child stream ${descendant.id}` + ); + } + } + } +} + +export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamParams) { + try { + await readStream({ id, scopedClusterClient }); + return true; + } catch (e) { + if (e instanceof DefinitionNotFound) { + return false; + } + throw e; + } +} + +interface SyncStreamParams { + scopedClusterClient: IScopedClusterClient; + definition: StreamDefinition; + rootDefinition?: StreamDefinition; + logger: Logger; +} + +export async function syncStream({ + scopedClusterClient, + definition, + rootDefinition, + logger, +}: SyncStreamParams) { + await upsertComponent({ + esClient: scopedClusterClient.asCurrentUser, + logger, + component: generateLayer(definition.id, definition), + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: generateIngestPipeline(definition.id, definition), + }); + const reroutePipeline = await generateReroutePipeline({ + definition, + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: reroutePipeline, + }); + await upsertTemplate({ + esClient: scopedClusterClient.asCurrentUser, + logger, + template: generateIndexTemplate(definition.id), + }); + if (rootDefinition) { + const parentReroutePipeline = await generateReroutePipeline({ + definition: rootDefinition, + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: parentReroutePipeline, + }); + } + await upsertDataStream({ + esClient: scopedClusterClient.asCurrentUser, + logger, + name: definition.id, + }); + await upsertInternalStream({ + scopedClusterClient, + definition, + }); + await rolloverDataStreamIfNecessary({ + esClient: scopedClusterClient.asCurrentUser, + name: definition.id, + logger, + }); +} diff --git a/x-pack/plugins/streams/server/plugin.ts b/x-pack/plugins/streams/server/plugin.ts new file mode 100644 index 0000000000000..ef070984803d5 --- /dev/null +++ b/x-pack/plugins/streams/server/plugin.ts @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + CoreSetup, + CoreStart, + KibanaRequest, + Logger, + Plugin, + PluginConfigDescriptor, + PluginInitializerContext, +} from '@kbn/core/server'; +import { registerRoutes } from '@kbn/server-route-repository'; +import { StreamsConfig, configSchema, exposeToBrowserConfig } from '../common/config'; +import { StreamsRouteRepository } from './routes'; +import { RouteDependencies } from './routes/types'; +import { + StreamsPluginSetupDependencies, + StreamsPluginStartDependencies, + StreamsServer, +} from './types'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginSetup {} +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginStart {} + +export const config: PluginConfigDescriptor = { + schema: configSchema, + exposeToBrowser: exposeToBrowserConfig, +}; + +export class StreamsPlugin + implements + Plugin< + StreamsPluginSetup, + StreamsPluginStart, + StreamsPluginSetupDependencies, + StreamsPluginStartDependencies + > +{ + public config: StreamsConfig; + public logger: Logger; + public server?: StreamsServer; + + constructor(context: PluginInitializerContext) { + this.config = context.config.get(); + this.logger = context.logger.get(); + } + + public setup(core: CoreSetup, plugins: StreamsPluginSetupDependencies): StreamsPluginSetup { + this.server = { + config: this.config, + logger: this.logger, + } as StreamsServer; + + registerRoutes({ + repository: StreamsRouteRepository, + dependencies: { + server: this.server, + getScopedClients: async ({ request }: { request: KibanaRequest }) => { + const [coreStart] = await core.getStartServices(); + const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request); + const soClient = coreStart.savedObjects.getScopedClient(request); + return { scopedClusterClient, soClient }; + }, + }, + core, + logger: this.logger, + }); + + return {}; + } + + public start(core: CoreStart, plugins: StreamsPluginStartDependencies): StreamsPluginStart { + if (this.server) { + this.server.core = core; + this.server.isServerless = core.elasticsearch.getCapabilities().serverless; + this.server.security = plugins.security; + this.server.encryptedSavedObjects = plugins.encryptedSavedObjects; + this.server.taskManager = plugins.taskManager; + } + + return {}; + } + + public stop() {} +} diff --git a/x-pack/plugins/streams/server/routes/create_server_route.ts b/x-pack/plugins/streams/server/routes/create_server_route.ts new file mode 100644 index 0000000000000..94d85a71c82bb --- /dev/null +++ b/x-pack/plugins/streams/server/routes/create_server_route.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createServerRouteFactory } from '@kbn/server-route-repository'; +import { StreamsRouteHandlerResources } from './types'; + +export const createServerRoute = createServerRouteFactory(); diff --git a/x-pack/plugins/streams/server/routes/index.ts b/x-pack/plugins/streams/server/routes/index.ts new file mode 100644 index 0000000000000..6fc734d3371b4 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/index.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { deleteStreamRoute } from './streams/delete'; +import { editStreamRoute } from './streams/edit'; +import { enableStreamsRoute } from './streams/enable'; +import { forkStreamsRoute } from './streams/fork'; +import { listStreamsRoute } from './streams/list'; +import { readStreamRoute } from './streams/read'; +import { resyncStreamsRoute } from './streams/resync'; + +export const StreamsRouteRepository = { + ...enableStreamsRoute, + ...resyncStreamsRoute, + ...forkStreamsRoute, + ...readStreamRoute, + ...editStreamRoute, + ...deleteStreamRoute, + ...listStreamsRoute, +}; + +export type StreamsRouteRepository = typeof StreamsRouteRepository; diff --git a/x-pack/plugins/streams/server/routes/streams/delete.ts b/x-pack/plugins/streams/server/routes/streams/delete.ts new file mode 100644 index 0000000000000..3820975dbe16a --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/delete.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { syncStream, readStream, deleteStreamObjects } from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { getParentId } from '../../lib/streams/helpers/hierarchy'; + +export const deleteStreamRoute = createServerRoute({ + endpoint: 'DELETE /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + + const parentId = getParentId(params.path.id); + if (!parentId) { + throw new MalformedStreamId('Cannot delete root stream'); + } + + await updateParentStream(scopedClusterClient, params.path.id, parentId, logger); + + await deleteStream(scopedClusterClient, params.path.id, logger); + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +async function deleteStream(scopedClusterClient: IScopedClusterClient, id: string, logger: Logger) { + try { + const { definition } = await readStream({ scopedClusterClient, id }); + for (const child of definition.children) { + await deleteStream(scopedClusterClient, child.id, logger); + } + await deleteStreamObjects({ scopedClusterClient, id, logger }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + logger.debug(`Stream definition for ${id} not found.`); + } else { + throw e; + } + } +} + +async function updateParentStream( + scopedClusterClient: IScopedClusterClient, + id: string, + parentId: string, + logger: Logger +) { + const { definition: parentDefinition } = await readStream({ + scopedClusterClient, + id: parentId, + }); + + parentDefinition.children = parentDefinition.children.filter((child) => child.id !== id); + + await syncStream({ + scopedClusterClient, + definition: parentDefinition, + logger, + }); + return parentDefinition; +} diff --git a/x-pack/plugins/streams/server/routes/streams/edit.ts b/x-pack/plugins/streams/server/routes/streams/edit.ts new file mode 100644 index 0000000000000..b82b4d54044da --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/edit.ts @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { StreamDefinition, streamWithoutIdDefinitonSchema } from '../../../common/types'; +import { + syncStream, + readStream, + checkStreamExists, + validateAncestorFields, + validateDescendantFields, +} from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { getParentId } from '../../lib/streams/helpers/hierarchy'; +import { MalformedChildren } from '../../lib/streams/errors/malformed_children'; + +export const editStreamRoute = createServerRoute({ + endpoint: 'PUT /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + body: streamWithoutIdDefinitonSchema, + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + + await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children); + await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields); + await validateDescendantFields(scopedClusterClient, params.path.id, params.body.fields); + + const parentId = getParentId(params.path.id); + let parentDefinition: StreamDefinition | undefined; + + if (parentId) { + parentDefinition = await updateParentStream( + scopedClusterClient, + parentId, + params.path.id, + logger + ); + } + const streamDefinition = { ...params.body }; + + await syncStream({ + scopedClusterClient, + definition: { ...streamDefinition, id: params.path.id }, + rootDefinition: parentDefinition, + logger, + }); + + for (const child of streamDefinition.children) { + const streamExists = await checkStreamExists({ + scopedClusterClient, + id: child.id, + }); + if (streamExists) { + continue; + } + // create empty streams for each child if they don't exist + const childDefinition = { + id: child.id, + children: [], + fields: [], + processing: [], + }; + + await syncStream({ + scopedClusterClient, + definition: childDefinition, + logger, + }); + } + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +async function updateParentStream( + scopedClusterClient: IScopedClusterClient, + parentId: string, + id: string, + logger: Logger +) { + const { definition: parentDefinition } = await readStream({ + scopedClusterClient, + id: parentId, + }); + + if (!parentDefinition.children.some((child) => child.id === id)) { + // add the child to the parent stream with an empty condition for now + parentDefinition.children.push({ + id, + condition: undefined, + }); + + await syncStream({ + scopedClusterClient, + definition: parentDefinition, + logger, + }); + } + return parentDefinition; +} + +async function validateStreamChildren( + scopedClusterClient: IScopedClusterClient, + id: string, + children: Array<{ id: string }> +) { + try { + const { definition: oldDefinition } = await readStream({ + scopedClusterClient, + id, + }); + const oldChildren = oldDefinition.children.map((child) => child.id); + const newChildren = new Set(children.map((child) => child.id)); + if (oldChildren.some((child) => !newChildren.has(child))) { + throw new MalformedChildren( + 'Cannot remove children from a stream, please delete the stream instead' + ); + } + } catch (e) { + // Ignore if the stream does not exist, but re-throw if it's another error + if (!(e instanceof DefinitionNotFound)) { + throw e; + } + } +} diff --git a/x-pack/plugins/streams/server/routes/streams/enable.ts b/x-pack/plugins/streams/server/routes/streams/enable.ts new file mode 100644 index 0000000000000..27d8929b28e50 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/enable.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { SecurityException } from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { syncStream } from '../../lib/streams/stream_crud'; +import { rootStreamDefinition } from '../../lib/streams/root_stream_definition'; +import { createStreamsIndex } from '../../lib/streams/internal_stream_mapping'; + +export const enableStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/_enable 2023-10-31', + params: z.object({}), + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + handler: async ({ request, response, logger, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + await createStreamsIndex(scopedClusterClient); + await syncStream({ + scopedClusterClient, + definition: rootStreamDefinition, + logger, + }); + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof SecurityException) { + return response.customError({ body: e, statusCode: 400 }); + } + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/fork.ts b/x-pack/plugins/streams/server/routes/streams/fork.ts new file mode 100644 index 0000000000000..44f4052878003 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/fork.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { conditionSchema, streamDefinitonWithoutChildrenSchema } from '../../../common/types'; +import { syncStream, readStream, validateAncestorFields } from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { isChildOf } from '../../lib/streams/helpers/hierarchy'; + +export const forkStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/{id}/_fork 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + body: z.object({ stream: streamDefinitonWithoutChildrenSchema, condition: conditionSchema }), + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + if (!params.body.condition) { + throw new ForkConditionMissing('You must provide a condition to fork a stream'); + } + + const { scopedClusterClient } = await getScopedClients({ request }); + + const { definition: rootDefinition } = await readStream({ + scopedClusterClient, + id: params.path.id, + }); + + const childDefinition = { ...params.body.stream, children: [] }; + + // check whether root stream has a child of the given name already + if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { + throw new MalformedStreamId( + `The stream with ID (${params.body.stream.id}) already exists as a child of the parent stream` + ); + } + + if (!isChildOf(rootDefinition, childDefinition)) { + throw new MalformedStreamId( + `The ID (${params.body.stream.id}) from the new stream must start with the parent's id (${rootDefinition.id}), followed by a dot and a name` + ); + } + + await validateAncestorFields( + scopedClusterClient, + params.body.stream.id, + params.body.stream.fields + ); + + rootDefinition.children.push({ + id: params.body.stream.id, + condition: params.body.condition, + }); + + await syncStream({ + scopedClusterClient, + definition: rootDefinition, + rootDefinition, + logger, + }); + + await syncStream({ + scopedClusterClient, + definition: childDefinition, + rootDefinition, + logger, + }); + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/list.ts b/x-pack/plugins/streams/server/routes/streams/list.ts new file mode 100644 index 0000000000000..2e4f13a89bb41 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/list.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { DefinitionNotFound } from '../../lib/streams/errors'; +import { listStreams } from '../../lib/streams/stream_crud'; + +export const listStreamsRoute = createServerRoute({ + endpoint: 'GET /api/streams 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({}), + handler: async ({ response, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + const definitions = await listStreams({ scopedClusterClient }); + + const trees = asTrees(definitions); + + return response.ok({ body: { streams: trees } }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +interface ListStreamDefinition { + id: string; + children: ListStreamDefinition[]; +} + +function asTrees(definitions: Array<{ id: string[] }>) { + const trees: ListStreamDefinition[] = []; + definitions.forEach((definition) => { + const path = definition.id[0].split('.'); + let currentTree = trees; + path.forEach((_id, index) => { + const partialPath = path.slice(0, index + 1).join('.'); + const existingNode = currentTree.find((node) => node.id === partialPath); + if (existingNode) { + currentTree = existingNode.children; + } else { + const newNode = { id: partialPath, children: [] }; + currentTree.push(newNode); + currentTree = newNode.children; + } + }); + }); + return trees; +} diff --git a/x-pack/plugins/streams/server/routes/streams/read.ts b/x-pack/plugins/streams/server/routes/streams/read.ts new file mode 100644 index 0000000000000..5ea2aaf5f2542 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/read.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { DefinitionNotFound } from '../../lib/streams/errors'; +import { readAncestors, readStream } from '../../lib/streams/stream_crud'; + +export const readStreamRoute = createServerRoute({ + endpoint: 'GET /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ id: z.string() }), + }), + handler: async ({ response, params, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + const streamEntity = await readStream({ + scopedClusterClient, + id: params.path.id, + }); + + const ancestors = await readAncestors({ + id: streamEntity.definition.id, + scopedClusterClient, + }); + + const body = { + ...streamEntity.definition, + inheritedFields: ancestors.flatMap(({ definition: { id, fields } }) => + fields.map((field) => ({ ...field, from: id })) + ), + }; + + return response.ok({ body }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/resync.ts b/x-pack/plugins/streams/server/routes/streams/resync.ts new file mode 100644 index 0000000000000..2365252ab00e6 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/resync.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { syncStream, readStream, listStreams } from '../../lib/streams/stream_crud'; + +export const resyncStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/_resync 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({}), + handler: async ({ response, logger, request, getScopedClients }) => { + const { scopedClusterClient } = await getScopedClients({ request }); + + const streams = await listStreams({ scopedClusterClient }); + + for (const stream of streams) { + const { definition } = await readStream({ + scopedClusterClient, + id: stream.id[0], + }); + await syncStream({ + scopedClusterClient, + definition, + logger, + }); + } + + return response.ok({}); + }, +}); diff --git a/x-pack/plugins/streams/server/routes/types.ts b/x-pack/plugins/streams/server/routes/types.ts new file mode 100644 index 0000000000000..d547d56c088cd --- /dev/null +++ b/x-pack/plugins/streams/server/routes/types.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaRequest } from '@kbn/core-http-server'; +import { DefaultRouteHandlerResources } from '@kbn/server-route-repository'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server'; +import { StreamsServer } from '../types'; + +export interface RouteDependencies { + server: StreamsServer; + getScopedClients: ({ request }: { request: KibanaRequest }) => Promise<{ + scopedClusterClient: IScopedClusterClient; + soClient: SavedObjectsClientContract; + }>; +} + +export type StreamsRouteHandlerResources = RouteDependencies & DefaultRouteHandlerResources; diff --git a/x-pack/plugins/streams/server/types.ts b/x-pack/plugins/streams/server/types.ts new file mode 100644 index 0000000000000..f119faa0ed010 --- /dev/null +++ b/x-pack/plugins/streams/server/types.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server'; +import { SecurityPluginStart } from '@kbn/security-plugin/server'; +import { + EncryptedSavedObjectsPluginSetup, + EncryptedSavedObjectsPluginStart, +} from '@kbn/encrypted-saved-objects-plugin/server'; +import { LicensingPluginStart } from '@kbn/licensing-plugin/server'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import { StreamsConfig } from '../common/config'; + +export interface StreamsServer { + core: CoreStart; + config: StreamsConfig; + logger: Logger; + security: SecurityPluginStart; + encryptedSavedObjects: EncryptedSavedObjectsPluginStart; + isServerless: boolean; + taskManager: TaskManagerStartContract; +} + +export interface ElasticsearchAccessorOptions { + elasticsearchClient: ElasticsearchClient; +} + +export interface StreamsPluginSetupDependencies { + encryptedSavedObjects: EncryptedSavedObjectsPluginSetup; + taskManager: TaskManagerSetupContract; +} + +export interface StreamsPluginStartDependencies { + security: SecurityPluginStart; + encryptedSavedObjects: EncryptedSavedObjectsPluginStart; + licensing: LicensingPluginStart; + taskManager: TaskManagerStartContract; +} diff --git a/x-pack/plugins/streams/tsconfig.json b/x-pack/plugins/streams/tsconfig.json new file mode 100644 index 0000000000000..c2fde35f9ca22 --- /dev/null +++ b/x-pack/plugins/streams/tsconfig.json @@ -0,0 +1,31 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "outDir": "target/types" + }, + "include": [ + "../../../typings/**/*", + "common/**/*", + "server/**/*", + "public/**/*", + "types/**/*" + ], + "exclude": [ + "target/**/*" + ], + "kbn_references": [ + "@kbn/config-schema", + "@kbn/core", + "@kbn/logging", + "@kbn/core-plugins-server", + "@kbn/core-http-server", + "@kbn/security-plugin", + "@kbn/core-saved-objects-api-server", + "@kbn/core-elasticsearch-server", + "@kbn/task-manager-plugin", + "@kbn/server-route-repository", + "@kbn/zod", + "@kbn/encrypted-saved-objects-plugin", + "@kbn/licensing-plugin", + ] +} diff --git a/yarn.lock b/yarn.lock index 8795abae4cc38..cdc813bf0a1d3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7464,6 +7464,10 @@ version "0.0.0" uid "" +"@kbn/streams-plugin@link:x-pack/plugins/streams": + version "0.0.0" + uid "" + "@kbn/synthetics-e2e@link:x-pack/plugins/observability_solution/synthetics/e2e": version "0.0.0" uid ""