From 7190c9ceda233b5eb6c0cb784a10e21e4cc39b22 Mon Sep 17 00:00:00 2001 From: Chris Cowan Date: Wed, 13 Nov 2024 03:52:59 -0700 Subject: [PATCH] [Streams] Introducing the new Streams plugin (#198713) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary This PR introduces the new experimental "Streams" plugin into the Kibana project. The Streams project aims to simplify workflows around dealing with messy logs in Elasticsearch. Our current offering is either extremely opinionated with integrations or leaves the user alone with the high flexibility of Elasticsearch concepts like index templates, component templates and so on, which make it challenging to configure everything correctly for good performance and controlling search speed and cost. ### Scope of PR - Provides an API for the user to "enable" the streams framework which creates the "root" entity `logs` with all the backing Elasticsearch assets - Provides an API for the user to "fork" a stream - Provides an API for the user to "read" a stream and all of it's Elasticsearch assets. - Provides an API for the user to upsert a stream (and implicitly child streams that are mentioned) - Part of this API is placing grok and disscect processing steps as well as fields to the mapping - Implements the Stream Naming Schema (SNS) which uses dots to express the index patterns and stream IDs. Example: `logs.nginx.errors` - The APIs will fully manage the `index_template`, `component_template`, and `ingest_pipelines`. ### Out of scope - Integration tests (coming in a follow-up) ### Reviewer Notes - I haven't implemented tests beyond a unit test for converting the filter conditions to Painless. I wanted to get a PR up so we can start iterating on the interface and functionality before we invest in testing. - You might need to add `server.versioned.versionResolution: oldest` to your `config/kibana.dev.yaml` to play with the requests below in the Kibana "Dev console". ### Example API Calls Enable the root stream (and set the mapping for the internal `.streams` index) ``` POST kbn:/api/streams/_enable ``` Read the root entity "logs" ``` GET kbn:/api/streams/logs ``` Fork the "root" entity "logs" and create "logs.nginx" based on a condition ``` POST kbn:/api/streams/logs/_fork { "stream": { "id": "logs.nginx", "children": [], "processing": [], "fields": [], }, "condition": { "field": "log.logger", "operator": "eq", "value": "nginx_proxy" } } ``` Fork the entity "logs.nginx" and create "logs.nginx.errors" based on a condition ``` POST kbn:/api/streams/logs.nginx/_fork { "stream": { "id": "logs.nginx.error", "children": [], "processing": [], "fields": [], }, "condition": { "or": [ { "field": "log.level", "operator": "eq", "value": "error" }, { "field": "log.level", "operator": "eq", "value": "ERROR" } ] } } ``` Set some processing on a stream and map the generated field ``` PUT kbn:/api/streams/logs.nginx { "children": [], "processing": [ { "config": { "type": "grok", "patterns": ["^%{IP:ip} – –"], "field": "message" } } ], "fields": [ { "name": "ip", "type": "ip" } ], } } ``` Field definitions are checked for both descendants and ancestors for incompatibilities to ensure they stay additive. If children are defined in the `PUT /api/streams/` API, sub-streams are created implicitly. If a stream is `PUT`, it's added to the parent as well with a condition that is never true (can be edited subsequently). `POST /api/streams/_resync` can be used to re-sync all streams from their meta data in case the Elasticsearch objects got messed up by some external change - not sure whether we want to keep that. Follow-ups * API integration tests * Check read permissions on data streams to determine whether a user is allowed to read certain streams --------- Co-authored-by: Joe Reuter Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> (cherry picked from commit b86dc8102ad29656ac159f73a374f980897a2c88) # Conflicts: # .github/CODEOWNERS --- docs/developer/plugin-list.asciidoc | 4 + package.json | 1 + packages/kbn-optimizer/limits.yml | 1 + tsconfig.base.json | 2 + x-pack/plugins/streams/README.md | 3 + x-pack/plugins/streams/common/config.ts | 30 ++ x-pack/plugins/streams/common/constants.ts | 9 + x-pack/plugins/streams/common/types.ts | 91 ++++++ x-pack/plugins/streams/jest.config.js | 15 + x-pack/plugins/streams/kibana.jsonc | 28 ++ x-pack/plugins/streams/public/index.ts | 13 + x-pack/plugins/streams/public/plugin.ts | 32 ++ x-pack/plugins/streams/public/types.ts | 16 + x-pack/plugins/streams/server/index.ts | 19 ++ .../component_templates/generate_layer.ts | 43 +++ .../streams/component_templates/logs_layer.ts | 23 ++ .../manage_component_templates.ts | 47 +++ .../lib/streams/component_templates/name.ts | 10 + .../data_streams/manage_data_streams.ts | 93 ++++++ .../errors/component_template_not_found.ts | 13 + .../streams/errors/definition_id_invalid.ts | 13 + .../streams/errors/definition_not_found.ts | 13 + .../streams/errors/fork_condition_missing.ts | 13 + .../lib/streams/errors/id_conflict_error.ts | 13 + .../server/lib/streams/errors/index.ts | 15 + .../errors/index_template_not_found.ts | 13 + .../errors/ingest_pipeline_not_found.ts | 13 + .../lib/streams/errors/malformed_children.ts | 13 + .../lib/streams/errors/malformed_fields.ts | 13 + .../lib/streams/errors/malformed_stream_id.ts | 13 + .../lib/streams/errors/permission_denied.ts | 13 + .../lib/streams/errors/security_exception.ts | 13 + .../helpers/condition_to_painless.test.ts | 133 ++++++++ .../streams/helpers/condition_to_painless.ts | 83 +++++ .../server/lib/streams/helpers/hierarchy.ts | 35 +++ .../server/lib/streams/helpers/retry.ts | 58 ++++ .../generate_index_template.ts | 42 +++ .../index_templates/manage_index_templates.ts | 44 +++ .../lib/streams/index_templates/name.ts | 10 + .../generate_ingest_pipeline.ts | 42 +++ .../generate_reroute_pipeline.ts | 34 +++ .../ingest_pipelines/logs_default_pipeline.ts | 23 ++ .../manage_ingest_pipelines.ts | 48 +++ .../lib/streams/ingest_pipelines/name.ts | 14 + .../lib/streams/internal_stream_mapping.ts | 35 +++ .../lib/streams/root_stream_definition.ts | 32 ++ .../streams/server/lib/streams/stream_crud.ts | 286 ++++++++++++++++++ x-pack/plugins/streams/server/plugin.ts | 92 ++++++ .../server/routes/create_server_route.ts | 11 + x-pack/plugins/streams/server/routes/index.ts | 26 ++ .../streams/server/routes/streams/delete.ts | 109 +++++++ .../streams/server/routes/streams/edit.ts | 171 +++++++++++ .../streams/server/routes/streams/enable.ts | 48 +++ .../streams/server/routes/streams/fork.ts | 112 +++++++ .../streams/server/routes/streams/list.ts | 70 +++++ .../streams/server/routes/streams/read.ts | 60 ++++ .../streams/server/routes/streams/resync.ts | 47 +++ x-pack/plugins/streams/server/routes/types.ts | 22 ++ x-pack/plugins/streams/server/types.ts | 45 +++ x-pack/plugins/streams/tsconfig.json | 31 ++ yarn.lock | 4 + 61 files changed, 2418 insertions(+) create mode 100644 x-pack/plugins/streams/README.md create mode 100644 x-pack/plugins/streams/common/config.ts create mode 100644 x-pack/plugins/streams/common/constants.ts create mode 100644 x-pack/plugins/streams/common/types.ts create mode 100644 x-pack/plugins/streams/jest.config.js create mode 100644 x-pack/plugins/streams/kibana.jsonc create mode 100644 x-pack/plugins/streams/public/index.ts create mode 100644 x-pack/plugins/streams/public/plugin.ts create mode 100644 x-pack/plugins/streams/public/types.ts create mode 100644 x-pack/plugins/streams/server/index.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/component_templates/name.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/index.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/helpers/retry.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/index_templates/name.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts create mode 100644 x-pack/plugins/streams/server/lib/streams/stream_crud.ts create mode 100644 x-pack/plugins/streams/server/plugin.ts create mode 100644 x-pack/plugins/streams/server/routes/create_server_route.ts create mode 100644 x-pack/plugins/streams/server/routes/index.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/delete.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/edit.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/enable.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/fork.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/list.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/read.ts create mode 100644 x-pack/plugins/streams/server/routes/streams/resync.ts create mode 100644 x-pack/plugins/streams/server/routes/types.ts create mode 100644 x-pack/plugins/streams/server/types.ts create mode 100644 x-pack/plugins/streams/tsconfig.json diff --git a/docs/developer/plugin-list.asciidoc b/docs/developer/plugin-list.asciidoc index 87f74ebdb7bb1..ea84b25dcda4d 100644 --- a/docs/developer/plugin-list.asciidoc +++ b/docs/developer/plugin-list.asciidoc @@ -909,6 +909,10 @@ routes, etc. |The stack_connectors plugin provides connector types shipped with Kibana, built on top of the framework provided in the actions plugin. +|{kib-repo}blob/{branch}/x-pack/plugins/streams/README.md[streams] +|This plugin provides an interface to manage streams + + |{kib-repo}blob/{branch}/x-pack/plugins/observability_solution/synthetics/README.md[synthetics] |The purpose of this plugin is to provide users of Heartbeat more visibility of what's happening in their infrastructure. diff --git a/package.json b/package.json index b208a28cdb0d2..a1bf3c22f6cd4 100644 --- a/package.json +++ b/package.json @@ -934,6 +934,7 @@ "@kbn/status-plugin-a-plugin": "link:test/server_integration/plugins/status_plugin_a", "@kbn/status-plugin-b-plugin": "link:test/server_integration/plugins/status_plugin_b", "@kbn/std": "link:packages/kbn-std", + "@kbn/streams-plugin": "link:x-pack/plugins/streams", "@kbn/synthetics-plugin": "link:x-pack/plugins/observability_solution/synthetics", "@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location", "@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture", diff --git a/packages/kbn-optimizer/limits.yml b/packages/kbn-optimizer/limits.yml index 533e632813960..6bbef17fe6e6c 100644 --- a/packages/kbn-optimizer/limits.yml +++ b/packages/kbn-optimizer/limits.yml @@ -161,6 +161,7 @@ pageLoadAssetSize: spaces: 57868 stackAlerts: 58316 stackConnectors: 67227 + streams: 16742 synthetics: 55971 telemetry: 51957 telemetryManagementSection: 38586 diff --git a/tsconfig.base.json b/tsconfig.base.json index 3880224e8601c..9b72acee0fcd0 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1844,6 +1844,8 @@ "@kbn/stdio-dev-helpers/*": ["packages/kbn-stdio-dev-helpers/*"], "@kbn/storybook": ["packages/kbn-storybook"], "@kbn/storybook/*": ["packages/kbn-storybook/*"], + "@kbn/streams-plugin": ["x-pack/plugins/streams"], + "@kbn/streams-plugin/*": ["x-pack/plugins/streams/*"], "@kbn/synthetics-e2e": ["x-pack/plugins/observability_solution/synthetics/e2e"], "@kbn/synthetics-e2e/*": ["x-pack/plugins/observability_solution/synthetics/e2e/*"], "@kbn/synthetics-plugin": ["x-pack/plugins/observability_solution/synthetics"], diff --git a/x-pack/plugins/streams/README.md b/x-pack/plugins/streams/README.md new file mode 100644 index 0000000000000..9a3539a33535d --- /dev/null +++ b/x-pack/plugins/streams/README.md @@ -0,0 +1,3 @@ +# Streams Plugin + +This plugin provides an interface to manage streams \ No newline at end of file diff --git a/x-pack/plugins/streams/common/config.ts b/x-pack/plugins/streams/common/config.ts new file mode 100644 index 0000000000000..3371b1b6d8cbc --- /dev/null +++ b/x-pack/plugins/streams/common/config.ts @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { schema, TypeOf } from '@kbn/config-schema'; + +export const configSchema = schema.object({}); + +export type StreamsConfig = TypeOf; + +/** + * The following map is passed to the server plugin setup under the + * exposeToBrowser: option, and controls which of the above config + * keys are allow-listed to be available in the browser config. + * + * NOTE: anything exposed here will be visible in the UI dev tools, + * and therefore MUST NOT be anything that is sensitive information! + */ +export const exposeToBrowserConfig = {} as const; + +type ValidKeys = keyof { + [K in keyof typeof exposeToBrowserConfig as (typeof exposeToBrowserConfig)[K] extends true + ? K + : never]: true; +}; + +export type StreamsPublicConfig = Pick; diff --git a/x-pack/plugins/streams/common/constants.ts b/x-pack/plugins/streams/common/constants.ts new file mode 100644 index 0000000000000..d7595990ded6e --- /dev/null +++ b/x-pack/plugins/streams/common/constants.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const ASSET_VERSION = 1; +export const STREAMS_INDEX = '.kibana_streams'; diff --git a/x-pack/plugins/streams/common/types.ts b/x-pack/plugins/streams/common/types.ts new file mode 100644 index 0000000000000..6cdb2f923f6f4 --- /dev/null +++ b/x-pack/plugins/streams/common/types.ts @@ -0,0 +1,91 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; + +const stringOrNumberOrBoolean = z.union([z.string(), z.number(), z.boolean()]); + +export const filterConditionSchema = z.object({ + field: z.string(), + operator: z.enum(['eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'contains', 'startsWith', 'endsWith']), + value: stringOrNumberOrBoolean, +}); + +export type FilterCondition = z.infer; + +export interface AndCondition { + and: Condition[]; +} + +export interface RerouteOrCondition { + or: Condition[]; +} + +export type Condition = FilterCondition | AndCondition | RerouteOrCondition | undefined; + +export const conditionSchema: z.ZodType = z.lazy(() => + z.union([ + filterConditionSchema, + z.object({ and: z.array(conditionSchema) }), + z.object({ or: z.array(conditionSchema) }), + ]) +); + +export const grokProcessingDefinitionSchema = z.object({ + type: z.literal('grok'), + field: z.string(), + patterns: z.array(z.string()), + pattern_definitions: z.optional(z.record(z.string())), +}); + +export const dissectProcessingDefinitionSchema = z.object({ + type: z.literal('dissect'), + field: z.string(), + pattern: z.string(), +}); + +export const processingDefinitionSchema = z.object({ + condition: z.optional(conditionSchema), + config: z.discriminatedUnion('type', [ + grokProcessingDefinitionSchema, + dissectProcessingDefinitionSchema, + ]), +}); + +export type ProcessingDefinition = z.infer; + +export const fieldDefinitionSchema = z.object({ + name: z.string(), + type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']), +}); + +export type FieldDefinition = z.infer; + +export const streamWithoutIdDefinitonSchema = z.object({ + processing: z.array(processingDefinitionSchema).default([]), + fields: z.array(fieldDefinitionSchema).default([]), + children: z + .array( + z.object({ + id: z.string(), + condition: conditionSchema, + }) + ) + .default([]), +}); + +export type StreamWithoutIdDefinition = z.infer; + +export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ + id: z.string(), +}); + +export type StreamDefinition = z.infer; + +export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true }); + +export type StreamWithoutChildrenDefinition = z.infer; diff --git a/x-pack/plugins/streams/jest.config.js b/x-pack/plugins/streams/jest.config.js new file mode 100644 index 0000000000000..43d4fd28da9b5 --- /dev/null +++ b/x-pack/plugins/streams/jest.config.js @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +module.exports = { + preset: '@kbn/test', + rootDir: '../../..', + roots: ['/x-pack/plugins/streams'], + coverageDirectory: '/target/kibana-coverage/jest/x-pack/plugins/streams', + coverageReporters: ['text', 'html'], + collectCoverageFrom: ['/x-pack/plugins/streams/{common,public,server}/**/*.{js,ts,tsx}'], +}; diff --git a/x-pack/plugins/streams/kibana.jsonc b/x-pack/plugins/streams/kibana.jsonc new file mode 100644 index 0000000000000..06c37ed245cf1 --- /dev/null +++ b/x-pack/plugins/streams/kibana.jsonc @@ -0,0 +1,28 @@ +{ + "type": "plugin", + "id": "@kbn/streams-plugin", + "owner": "@simianhacker @flash1293 @dgieselaar", + "description": "A manager for Streams", + "group": "observability", + "visibility": "private", + "plugin": { + "id": "streams", + "configPath": ["xpack", "streams"], + "browser": true, + "server": true, + "requiredPlugins": [ + "data", + "security", + "encryptedSavedObjects", + "usageCollection", + "licensing", + "taskManager" + ], + "optionalPlugins": [ + "cloud", + "serverless" + ], + "requiredBundles": [ + ] + } +} diff --git a/x-pack/plugins/streams/public/index.ts b/x-pack/plugins/streams/public/index.ts new file mode 100644 index 0000000000000..5b83ea1d297d3 --- /dev/null +++ b/x-pack/plugins/streams/public/index.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PluginInitializer, PluginInitializerContext } from '@kbn/core/public'; +import { Plugin } from './plugin'; + +export const plugin: PluginInitializer<{}, {}> = (context: PluginInitializerContext) => { + return new Plugin(context); +}; diff --git a/x-pack/plugins/streams/public/plugin.ts b/x-pack/plugins/streams/public/plugin.ts new file mode 100644 index 0000000000000..f35d18e06ff70 --- /dev/null +++ b/x-pack/plugins/streams/public/plugin.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreSetup, CoreStart, PluginInitializerContext } from '@kbn/core/public'; +import { Logger } from '@kbn/logging'; + +import type { StreamsPublicConfig } from '../common/config'; +import { StreamsPluginClass, StreamsPluginSetup, StreamsPluginStart } from './types'; + +export class Plugin implements StreamsPluginClass { + public config: StreamsPublicConfig; + public logger: Logger; + + constructor(context: PluginInitializerContext<{}>) { + this.config = context.config.get(); + this.logger = context.logger.get(); + } + + setup(core: CoreSetup, pluginSetup: StreamsPluginSetup) { + return {}; + } + + start(core: CoreStart) { + return {}; + } + + stop() {} +} diff --git a/x-pack/plugins/streams/public/types.ts b/x-pack/plugins/streams/public/types.ts new file mode 100644 index 0000000000000..61e5fa94098f0 --- /dev/null +++ b/x-pack/plugins/streams/public/types.ts @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Plugin as PluginClass } from '@kbn/core/public'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginSetup {} + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginStart {} + +export type StreamsPluginClass = PluginClass<{}, {}, StreamsPluginSetup, StreamsPluginStart>; diff --git a/x-pack/plugins/streams/server/index.ts b/x-pack/plugins/streams/server/index.ts new file mode 100644 index 0000000000000..bd8aee304ad15 --- /dev/null +++ b/x-pack/plugins/streams/server/index.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PluginInitializerContext } from '@kbn/core-plugins-server'; +import { StreamsConfig } from '../common/config'; +import { StreamsPluginSetup, StreamsPluginStart, config } from './plugin'; +import { StreamsRouteRepository } from './routes'; + +export type { StreamsConfig, StreamsPluginSetup, StreamsPluginStart, StreamsRouteRepository }; +export { config }; + +export const plugin = async (context: PluginInitializerContext) => { + const { StreamsPlugin } = await import('./plugin'); + return new StreamsPlugin(context); +}; diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts new file mode 100644 index 0000000000000..82c89c9ab9171 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/generate_layer.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + ClusterPutComponentTemplateRequest, + MappingProperty, +} from '@elastic/elasticsearch/lib/api/types'; +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { logsSettings } from './logs_layer'; +import { isRoot } from '../helpers/hierarchy'; +import { getComponentTemplateName } from './name'; + +export function generateLayer( + id: string, + definition: StreamDefinition +): ClusterPutComponentTemplateRequest { + const properties: Record = {}; + definition.fields.forEach((field) => { + properties[field.name] = { + type: field.type, + }; + }); + return { + name: getComponentTemplateName(id), + template: { + settings: isRoot(definition.id) ? logsSettings : {}, + mappings: { + subobjects: false, + properties, + }, + }, + version: ASSET_VERSION, + _meta: { + managed: true, + description: `Default settings for the ${id} stream`, + }, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts new file mode 100644 index 0000000000000..6b41d04131c56 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/logs_layer.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types'; + +export const logsSettings: IndicesIndexSettings = { + index: { + lifecycle: { + name: 'logs', + }, + codec: 'best_compression', + mapping: { + total_fields: { + ignore_dynamic_beyond_limit: true, + }, + ignore_malformed: true, + }, + }, +}; diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts new file mode 100644 index 0000000000000..a7d707a4ce42a --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/manage_component_templates.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { ClusterPutComponentTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DeleteComponentOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface ComponentManagementOptions { + esClient: ElasticsearchClient; + component: ClusterPutComponentTemplateRequest; + logger: Logger; +} + +export async function deleteComponent({ esClient, name, logger }: DeleteComponentOptions) { + try { + await retryTransientEsErrors( + () => esClient.cluster.deleteComponentTemplate({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting component template: ${error.message}`); + throw error; + } +} + +export async function upsertComponent({ esClient, component, logger }: ComponentManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.cluster.putComponentTemplate(component), { + logger, + }); + logger.debug(() => `Installed component template: ${JSON.stringify(component)}`); + } catch (error: any) { + logger.error(`Error updating component template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts b/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts new file mode 100644 index 0000000000000..6ea05b9a53b28 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/component_templates/name.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getComponentTemplateName(id: string) { + return `${id}@stream.layer`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts new file mode 100644 index 0000000000000..812739db56c73 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DataStreamManagementOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface DeleteDataStreamOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +interface RolloverDataStreamOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +export async function upsertDataStream({ esClient, name, logger }: DataStreamManagementOptions) { + const dataStreamExists = await esClient.indices.exists({ index: name }); + if (dataStreamExists) { + return; + } + try { + await retryTransientEsErrors(() => esClient.indices.createDataStream({ name }), { logger }); + logger.debug(() => `Installed data stream: ${name}`); + } catch (error: any) { + logger.error(`Error creating data stream: ${error.message}`); + throw error; + } +} + +export async function deleteDataStream({ esClient, name, logger }: DeleteDataStreamOptions) { + try { + await retryTransientEsErrors( + () => esClient.indices.deleteDataStream({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting data stream: ${error.message}`); + throw error; + } +} + +export async function rolloverDataStreamIfNecessary({ + esClient, + name, + logger, +}: RolloverDataStreamOptions) { + const dataStreams = await esClient.indices.getDataStream({ name: `${name},${name}.*` }); + for (const dataStream of dataStreams.data_streams) { + const currentMappings = + Object.values( + await esClient.indices.getMapping({ + index: dataStream.indices.at(-1)?.index_name, + }) + )[0].mappings.properties || {}; + const simulatedIndex = await esClient.indices.simulateIndexTemplate({ name: dataStream.name }); + const simulatedMappings = simulatedIndex.template.mappings.properties || {}; + + // check whether the same fields and same types are listed (don't check for other mapping attributes) + const isDifferent = + Object.values(simulatedMappings).length !== Object.values(currentMappings).length || + Object.entries(simulatedMappings || {}).some(([fieldName, { type }]) => { + const currentType = currentMappings[fieldName]?.type; + return currentType !== type; + }); + + if (!isDifferent) { + continue; + } + + try { + await retryTransientEsErrors(() => esClient.indices.rollover({ alias: dataStream.name }), { + logger, + }); + logger.debug(() => `Rolled over data stream: ${dataStream.name}`); + } catch (error: any) { + logger.error(`Error rolling over data stream: ${error.message}`); + throw error; + } + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts new file mode 100644 index 0000000000000..a7e9cebf98507 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/component_template_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class ComponentTemplateNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'ComponentTemplateNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts b/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts new file mode 100644 index 0000000000000..817e8f67bf25d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/definition_id_invalid.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class DefinitionIdInvalid extends Error { + constructor(message: string) { + super(message); + this.name = 'DefinitionIdInvalid'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts new file mode 100644 index 0000000000000..f7e60193baa5f --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/definition_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class DefinitionNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'DefinitionNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts b/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts new file mode 100644 index 0000000000000..713751dbe4363 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/fork_condition_missing.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class ForkConditionMissing extends Error { + constructor(message: string) { + super(message); + this.name = 'ForkConditionMissing'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts b/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts new file mode 100644 index 0000000000000..a24c7357379fa --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/id_conflict_error.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IdConflict extends Error { + constructor(message: string) { + super(message); + this.name = 'IdConflict'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/index.ts b/x-pack/plugins/streams/server/lib/streams/errors/index.ts new file mode 100644 index 0000000000000..73842ef3018fe --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/index.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './definition_id_invalid'; +export * from './definition_not_found'; +export * from './id_conflict_error'; +export * from './permission_denied'; +export * from './security_exception'; +export * from './index_template_not_found'; +export * from './fork_condition_missing'; +export * from './component_template_not_found'; diff --git a/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts new file mode 100644 index 0000000000000..4f4735dd15fa1 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/index_template_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IndexTemplateNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'IndexTemplateNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts b/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts new file mode 100644 index 0000000000000..8bf9bbd4933ce --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/ingest_pipeline_not_found.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class IngestPipelineNotFound extends Error { + constructor(message: string) { + super(message); + this.name = 'IngestPipelineNotFound'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts new file mode 100644 index 0000000000000..699c4cdd5b1ef --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_children.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedChildren extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedChildren'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts new file mode 100644 index 0000000000000..b8f7ac1392610 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_fields.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedFields extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedFields'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts b/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts new file mode 100644 index 0000000000000..2f988204c74b0 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/malformed_stream_id.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MalformedStreamId extends Error { + constructor(message: string) { + super(message); + this.name = 'MalformedStreamId'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts b/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts new file mode 100644 index 0000000000000..f0133e28063ca --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/permission_denied.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class PermissionDenied extends Error { + constructor(message: string) { + super(message); + this.name = 'PermissionDenied'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts b/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts new file mode 100644 index 0000000000000..0b4ae450c2530 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/errors/security_exception.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class SecurityException extends Error { + constructor(message: string) { + super(message); + this.name = 'SecurityException'; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts new file mode 100644 index 0000000000000..aab7f27f12d14 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.test.ts @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { conditionToPainless } from './condition_to_painless'; + +const operatorConditionAndResutls = [ + { + condition: { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + result: 'ctx.log?.logger == "nginx_proxy"', + }, + { + condition: { field: 'log.logger', operator: 'neq' as const, value: 'nginx_proxy' }, + result: 'ctx.log?.logger != "nginx_proxy"', + }, + { + condition: { field: 'http.response.status_code', operator: 'lt' as const, value: 500 }, + result: 'ctx.http?.response?.status_code < 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'lte' as const, value: 500 }, + result: 'ctx.http?.response?.status_code <= 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'gt' as const, value: 500 }, + result: 'ctx.http?.response?.status_code > 500', + }, + { + condition: { field: 'http.response.status_code', operator: 'gte' as const, value: 500 }, + result: 'ctx.http?.response?.status_code >= 500', + }, + { + condition: { field: 'log.logger', operator: 'startsWith' as const, value: 'nginx' }, + result: 'ctx.log?.logger.startsWith("nginx")', + }, + { + condition: { field: 'log.logger', operator: 'endsWith' as const, value: 'proxy' }, + result: 'ctx.log?.logger.endsWith("proxy")', + }, + { + condition: { field: 'log.logger', operator: 'contains' as const, value: 'proxy' }, + result: 'ctx.log?.logger.contains("proxy")', + }, +]; + +describe('conditionToPainless', () => { + describe('operators', () => { + operatorConditionAndResutls.forEach((setup) => { + test(`${setup.condition.operator}`, () => { + expect(conditionToPainless(setup.condition)).toEqual(setup.result); + }); + }); + }); + + describe('and', () => { + test('simple', () => { + const condition = { + and: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" && ctx.log?.level == "error"' + ) + ); + }); + }); + + describe('or', () => { + test('simple', () => { + const condition = { + or: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" || ctx.log?.level == "error"' + ) + ); + }); + }); + + describe('nested', () => { + test('and with a filter and or with 2 filters', () => { + const condition = { + and: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { + or: [ + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + { field: 'log.level', operator: 'eq' as const, value: 'ERROR' }, + ], + }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + 'ctx.log?.logger == "nginx_proxy" && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + ) + ); + }); + test('and with 2 or with filters', () => { + const condition = { + and: [ + { + or: [ + { field: 'log.logger', operator: 'eq' as const, value: 'nginx_proxy' }, + { field: 'service.name', operator: 'eq' as const, value: 'nginx' }, + ], + }, + { + or: [ + { field: 'log.level', operator: 'eq' as const, value: 'error' }, + { field: 'log.level', operator: 'eq' as const, value: 'ERROR' }, + ], + }, + ], + }; + expect( + expect(conditionToPainless(condition)).toEqual( + '(ctx.log?.logger == "nginx_proxy" || ctx.service?.name == "nginx") && (ctx.log?.level == "error" || ctx.log?.level == "ERROR")' + ) + ); + }); + }); +}); diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts new file mode 100644 index 0000000000000..539ad3603535b --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { isBoolean, isString } from 'lodash'; +import { + AndCondition, + Condition, + conditionSchema, + FilterCondition, + filterConditionSchema, + RerouteOrCondition, +} from '../../../../common/types'; + +function isFilterCondition(subject: any): subject is FilterCondition { + const result = filterConditionSchema.safeParse(subject); + return result.success; +} + +function isAndCondition(subject: any): subject is AndCondition { + const result = conditionSchema.safeParse(subject); + return result.success && subject.and != null; +} + +function isOrCondition(subject: any): subject is RerouteOrCondition { + const result = conditionSchema.safeParse(subject); + return result.success && subject.or != null; +} + +function safePainlessField(condition: FilterCondition) { + return `ctx.${condition.field.split('.').join('?.')}`; +} + +function encodeValue(value: string | number | boolean) { + if (isString(value)) { + return `"${value}"`; + } + if (isBoolean(value)) { + return value ? 'true' : 'false'; + } + return value; +} + +function toPainless(condition: FilterCondition) { + switch (condition.operator) { + case 'neq': + return `${safePainlessField(condition)} != ${encodeValue(condition.value)}`; + case 'lt': + return `${safePainlessField(condition)} < ${encodeValue(condition.value)}`; + case 'lte': + return `${safePainlessField(condition)} <= ${encodeValue(condition.value)}`; + case 'gt': + return `${safePainlessField(condition)} > ${encodeValue(condition.value)}`; + case 'gte': + return `${safePainlessField(condition)} >= ${encodeValue(condition.value)}`; + case 'startsWith': + return `${safePainlessField(condition)}.startsWith(${encodeValue(condition.value)})`; + case 'endsWith': + return `${safePainlessField(condition)}.endsWith(${encodeValue(condition.value)})`; + case 'contains': + return `${safePainlessField(condition)}.contains(${encodeValue(condition.value)})`; + default: + return `${safePainlessField(condition)} == ${encodeValue(condition.value)}`; + } +} + +export function conditionToPainless(condition: Condition, nested = false): string { + if (isFilterCondition(condition)) { + return toPainless(condition); + } + if (isAndCondition(condition)) { + const and = condition.and.map((filter) => conditionToPainless(filter, true)).join(' && '); + return nested ? `(${and})` : and; + } + if (isOrCondition(condition)) { + const or = condition.or.map((filter) => conditionToPainless(filter, true)).join(' || '); + return nested ? `(${or})` : or; + } + return 'false'; +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts b/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts new file mode 100644 index 0000000000000..6f1cd308f3c3d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/hierarchy.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; + +export function isDescendandOf(parent: StreamDefinition, child: StreamDefinition) { + return child.id.startsWith(parent.id); +} + +export function isChildOf(parent: StreamDefinition, child: StreamDefinition) { + return ( + isDescendandOf(parent, child) && child.id.split('.').length === parent.id.split('.').length + 1 + ); +} + +export function getParentId(id: string) { + const parts = id.split('.'); + if (parts.length === 1) { + return undefined; + } + return parts.slice(0, parts.length - 1).join('.'); +} + +export function isRoot(id: string) { + return id.split('.').length === 1; +} + +export function getAncestors(id: string) { + const parts = id.split('.'); + return parts.slice(0, parts.length - 1).map((_, index) => parts.slice(0, index + 1).join('.')); +} diff --git a/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts b/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts new file mode 100644 index 0000000000000..32604a22bf9be --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/helpers/retry.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { setTimeout } from 'timers/promises'; +import { errors as EsErrors } from '@elastic/elasticsearch'; +import type { Logger } from '@kbn/logging'; +import { SecurityException } from '../errors'; + +const MAX_ATTEMPTS = 5; + +const retryResponseStatuses = [ + 503, // ServiceUnavailable + 408, // RequestTimeout + 410, // Gone +]; + +const isRetryableError = (e: any) => + e instanceof EsErrors.NoLivingConnectionsError || + e instanceof EsErrors.ConnectionError || + e instanceof EsErrors.TimeoutError || + (e instanceof EsErrors.ResponseError && retryResponseStatuses.includes(e?.statusCode!)); + +/** + * Retries any transient network or configuration issues encountered from Elasticsearch with an exponential backoff. + * Should only be used to wrap operations that are idempotent and can be safely executed more than once. + */ +export const retryTransientEsErrors = async ( + esCall: () => Promise, + { logger, attempt = 0 }: { logger?: Logger; attempt?: number } = {} +): Promise => { + try { + return await esCall(); + } catch (e) { + if (attempt < MAX_ATTEMPTS && isRetryableError(e)) { + const retryCount = attempt + 1; + const retryDelaySec = Math.min(Math.pow(2, retryCount), 64); // 2s, 4s, 8s, 16s, 32s, 64s, 64s, 64s ... + + logger?.warn( + `Retrying Elasticsearch operation after [${retryDelaySec}s] due to error: ${e.toString()} ${ + e.stack + }` + ); + + await setTimeout(retryDelaySec * 1000); + return retryTransientEsErrors(esCall, { logger, attempt: retryCount }); + } + + if (e.meta?.body?.error?.type === 'security_exception') { + throw new SecurityException(e.meta.body.error.reason); + } + + throw e; + } +}; diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts new file mode 100644 index 0000000000000..7a16534a618da --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/generate_index_template.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ASSET_VERSION } from '../../../../common/constants'; +import { getProcessingPipelineName } from '../ingest_pipelines/name'; +import { getIndexTemplateName } from './name'; + +export function generateIndexTemplate(id: string) { + const composedOf = id.split('.').reduce((acc, _, index, array) => { + const parent = array.slice(0, index + 1).join('.'); + return [...acc, `${parent}@stream.layer`]; + }, [] as string[]); + + return { + name: getIndexTemplateName(id), + index_patterns: [id], + composed_of: composedOf, + priority: 200, + version: ASSET_VERSION, + _meta: { + managed: true, + description: `The index template for ${id} stream`, + }, + data_stream: { + hidden: false, + }, + template: { + settings: { + index: { + default_pipeline: getProcessingPipelineName(id), + }, + }, + }, + allow_auto_create: true, + // ignore missing component templates to be more robust against out-of-order syncs + ignore_missing_component_templates: composedOf, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts new file mode 100644 index 0000000000000..9383e698b3436 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/manage_index_templates.ts @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IndicesPutIndexTemplateRequest } from '@elastic/elasticsearch/lib/api/types'; +import { ElasticsearchClient, Logger } from '@kbn/core/server'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface TemplateManagementOptions { + esClient: ElasticsearchClient; + template: IndicesPutIndexTemplateRequest; + logger: Logger; +} + +interface DeleteTemplateOptions { + esClient: ElasticsearchClient; + name: string; + logger: Logger; +} + +export async function upsertTemplate({ esClient, template, logger }: TemplateManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.indices.putIndexTemplate(template), { logger }); + logger.debug(() => `Installed index template: ${JSON.stringify(template)}`); + } catch (error: any) { + logger.error(`Error updating index template: ${error.message}`); + throw error; + } +} + +export async function deleteTemplate({ esClient, name, logger }: DeleteTemplateOptions) { + try { + await retryTransientEsErrors( + () => esClient.indices.deleteIndexTemplate({ name }, { ignore: [404] }), + { logger } + ); + } catch (error: any) { + logger.error(`Error deleting index template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts b/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts new file mode 100644 index 0000000000000..ec8ea5519a6b4 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/index_templates/name.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getIndexTemplateName(id: string) { + return `${id}@stream`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts new file mode 100644 index 0000000000000..eb09df8831304 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { conditionToPainless } from '../helpers/condition_to_painless'; +import { logsDefaultPipelineProcessors } from './logs_default_pipeline'; +import { isRoot } from '../helpers/hierarchy'; +import { getProcessingPipelineName } from './name'; + +export function generateIngestPipeline(id: string, definition: StreamDefinition) { + return { + id: getProcessingPipelineName(id), + processors: [ + ...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []), + ...definition.processing.map((processor) => { + const { type, ...config } = processor.config; + return { + [type]: { + ...config, + if: processor.condition ? conditionToPainless(processor.condition) : undefined, + }, + }; + }), + { + pipeline: { + name: `${id}@stream.reroutes`, + ignore_missing_pipeline: true, + }, + }, + ], + _meta: { + description: `Default pipeline for the ${id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts new file mode 100644 index 0000000000000..9b46e0cf4ac92 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../../common/types'; +import { ASSET_VERSION } from '../../../../common/constants'; +import { conditionToPainless } from '../helpers/condition_to_painless'; +import { getReroutePipelineName } from './name'; + +interface GenerateReroutePipelineParams { + definition: StreamDefinition; +} + +export async function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) { + return { + id: getReroutePipelineName(definition.id), + processors: definition.children.map((child) => { + return { + reroute: { + destination: child.id, + if: conditionToPainless(child.condition), + }, + }; + }), + _meta: { + description: `Reoute pipeline for the ${definition.id} stream`, + managed: true, + }, + version: ASSET_VERSION, + }; +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts new file mode 100644 index 0000000000000..762155ba5047c --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const logsDefaultPipelineProcessors = [ + { + set: { + description: "If '@timestamp' is missing, set it with the ingest timestamp", + field: '@timestamp', + override: false, + copy_from: '_ingest.timestamp', + }, + }, + { + pipeline: { + name: 'logs@json-pipeline', + ignore_missing_pipeline: true, + }, + }, +]; diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts new file mode 100644 index 0000000000000..467e2efb48f0d --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/manage_ingest_pipelines.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { IngestPutPipelineRequest } from '@elastic/elasticsearch/lib/api/types'; +import { retryTransientEsErrors } from '../helpers/retry'; + +interface DeletePipelineOptions { + esClient: ElasticsearchClient; + id: string; + logger: Logger; +} + +interface PipelineManagementOptions { + esClient: ElasticsearchClient; + pipeline: IngestPutPipelineRequest; + logger: Logger; +} + +export async function deleteIngestPipeline({ esClient, id, logger }: DeletePipelineOptions) { + try { + await retryTransientEsErrors(() => esClient.ingest.deletePipeline({ id }, { ignore: [404] }), { + logger, + }); + } catch (error: any) { + logger.error(`Error deleting ingest pipeline: ${error.message}`); + throw error; + } +} + +export async function upsertIngestPipeline({ + esClient, + pipeline, + logger, +}: PipelineManagementOptions) { + try { + await retryTransientEsErrors(() => esClient.ingest.putPipeline(pipeline), { logger }); + logger.debug(() => `Installed index template: ${JSON.stringify(pipeline)}`); + } catch (error: any) { + logger.error(`Error updating index template: ${error.message}`); + throw error; + } +} diff --git a/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts new file mode 100644 index 0000000000000..8d2a97ff3137f --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/ingest_pipelines/name.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export function getProcessingPipelineName(id: string) { + return `${id}@stream.processing`; +} + +export function getReroutePipelineName(id: string) { + return `${id}@stream.reroutes`; +} diff --git a/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts new file mode 100644 index 0000000000000..8e88eeef8cd84 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/internal_stream_mapping.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { STREAMS_INDEX } from '../../../common/constants'; + +export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) { + return scopedClusterClient.asInternalUser.indices.create({ + index: STREAMS_INDEX, + mappings: { + dynamic: 'strict', + properties: { + processing: { + type: 'object', + enabled: false, + }, + fields: { + type: 'object', + enabled: false, + }, + children: { + type: 'object', + enabled: false, + }, + id: { + type: 'keyword', + }, + }, + }, + }); +} diff --git a/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts new file mode 100644 index 0000000000000..2b7deed877309 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/root_stream_definition.ts @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamDefinition } from '../../../common/types'; + +export const rootStreamDefinition: StreamDefinition = { + id: 'logs', + processing: [], + children: [], + fields: [ + { + name: '@timestamp', + type: 'date', + }, + { + name: 'message', + type: 'match_only_text', + }, + { + name: 'host.name', + type: 'keyword', + }, + { + name: 'log.level', + type: 'keyword', + }, + ], +}; diff --git a/x-pack/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts new file mode 100644 index 0000000000000..78a126905d9a4 --- /dev/null +++ b/x-pack/plugins/streams/server/lib/streams/stream_crud.ts @@ -0,0 +1,286 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { FieldDefinition, StreamDefinition } from '../../../common/types'; +import { STREAMS_INDEX } from '../../../common/constants'; +import { DefinitionNotFound } from './errors'; +import { deleteTemplate, upsertTemplate } from './index_templates/manage_index_templates'; +import { generateLayer } from './component_templates/generate_layer'; +import { generateIngestPipeline } from './ingest_pipelines/generate_ingest_pipeline'; +import { generateReroutePipeline } from './ingest_pipelines/generate_reroute_pipeline'; +import { generateIndexTemplate } from './index_templates/generate_index_template'; +import { deleteComponent, upsertComponent } from './component_templates/manage_component_templates'; +import { getIndexTemplateName } from './index_templates/name'; +import { getComponentTemplateName } from './component_templates/name'; +import { getProcessingPipelineName, getReroutePipelineName } from './ingest_pipelines/name'; +import { + deleteIngestPipeline, + upsertIngestPipeline, +} from './ingest_pipelines/manage_ingest_pipelines'; +import { getAncestors } from './helpers/hierarchy'; +import { MalformedFields } from './errors/malformed_fields'; +import { + deleteDataStream, + rolloverDataStreamIfNecessary, + upsertDataStream, +} from './data_streams/manage_data_streams'; + +interface BaseParams { + scopedClusterClient: IScopedClusterClient; +} + +interface BaseParamsWithDefinition extends BaseParams { + definition: StreamDefinition; +} + +interface DeleteStreamParams extends BaseParams { + id: string; + logger: Logger; +} + +export async function deleteStreamObjects({ id, scopedClusterClient, logger }: DeleteStreamParams) { + await deleteDataStream({ + esClient: scopedClusterClient.asCurrentUser, + name: id, + logger, + }); + await deleteTemplate({ + esClient: scopedClusterClient.asCurrentUser, + name: getIndexTemplateName(id), + logger, + }); + await deleteComponent({ + esClient: scopedClusterClient.asCurrentUser, + name: getComponentTemplateName(id), + logger, + }); + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getProcessingPipelineName(id), + logger, + }); + await deleteIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + id: getReroutePipelineName(id), + logger, + }); + await scopedClusterClient.asInternalUser.delete({ + id, + index: STREAMS_INDEX, + refresh: 'wait_for', + }); +} + +async function upsertInternalStream({ definition, scopedClusterClient }: BaseParamsWithDefinition) { + return scopedClusterClient.asInternalUser.index({ + id: definition.id, + index: STREAMS_INDEX, + document: definition, + refresh: 'wait_for', + }); +} + +type ListStreamsParams = BaseParams; + +export async function listStreams({ scopedClusterClient }: ListStreamsParams) { + const response = await scopedClusterClient.asInternalUser.search({ + index: STREAMS_INDEX, + size: 10000, + fields: ['id'], + _source: false, + sort: [{ id: 'asc' }], + }); + const definitions = response.hits.hits.map((hit) => hit.fields as { id: string[] }); + return definitions; +} + +interface ReadStreamParams extends BaseParams { + id: string; +} + +export async function readStream({ id, scopedClusterClient }: ReadStreamParams) { + try { + const response = await scopedClusterClient.asInternalUser.get({ + id, + index: STREAMS_INDEX, + }); + const definition = response._source as StreamDefinition; + return { + definition, + }; + } catch (e) { + if (e.meta?.statusCode === 404) { + throw new DefinitionNotFound(`Stream definition for ${id} not found.`); + } + throw e; + } +} + +interface ReadAncestorsParams extends BaseParams { + id: string; +} + +export async function readAncestors({ id, scopedClusterClient }: ReadAncestorsParams) { + const ancestorIds = getAncestors(id); + + return await Promise.all( + ancestorIds.map((ancestorId) => readStream({ scopedClusterClient, id: ancestorId })) + ); +} + +interface ReadDescendantsParams extends BaseParams { + id: string; +} + +export async function readDescendants({ id, scopedClusterClient }: ReadDescendantsParams) { + const response = await scopedClusterClient.asInternalUser.search({ + index: STREAMS_INDEX, + size: 10000, + body: { + query: { + bool: { + filter: { + prefix: { + id, + }, + }, + must_not: { + term: { + id, + }, + }, + }, + }, + }, + }); + return response.hits.hits.map((hit) => hit._source as StreamDefinition); +} + +export async function validateAncestorFields( + scopedClusterClient: IScopedClusterClient, + id: string, + fields: FieldDefinition[] +) { + const ancestors = await readAncestors({ + id, + scopedClusterClient, + }); + for (const ancestor of ancestors) { + for (const field of fields) { + if ( + ancestor.definition.fields.some( + (ancestorField) => ancestorField.type !== field.type && ancestorField.name === field.name + ) + ) { + throw new MalformedFields( + `Field ${field.name} is already defined with incompatible type in the parent stream ${ancestor.definition.id}` + ); + } + } + } +} + +export async function validateDescendantFields( + scopedClusterClient: IScopedClusterClient, + id: string, + fields: FieldDefinition[] +) { + const descendants = await readDescendants({ + id, + scopedClusterClient, + }); + for (const descendant of descendants) { + for (const field of fields) { + if ( + descendant.fields.some( + (descendantField) => + descendantField.type !== field.type && descendantField.name === field.name + ) + ) { + throw new MalformedFields( + `Field ${field.name} is already defined with incompatible type in the child stream ${descendant.id}` + ); + } + } + } +} + +export async function checkStreamExists({ id, scopedClusterClient }: ReadStreamParams) { + try { + await readStream({ id, scopedClusterClient }); + return true; + } catch (e) { + if (e instanceof DefinitionNotFound) { + return false; + } + throw e; + } +} + +interface SyncStreamParams { + scopedClusterClient: IScopedClusterClient; + definition: StreamDefinition; + rootDefinition?: StreamDefinition; + logger: Logger; +} + +export async function syncStream({ + scopedClusterClient, + definition, + rootDefinition, + logger, +}: SyncStreamParams) { + await upsertComponent({ + esClient: scopedClusterClient.asCurrentUser, + logger, + component: generateLayer(definition.id, definition), + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: generateIngestPipeline(definition.id, definition), + }); + const reroutePipeline = await generateReroutePipeline({ + definition, + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: reroutePipeline, + }); + await upsertTemplate({ + esClient: scopedClusterClient.asCurrentUser, + logger, + template: generateIndexTemplate(definition.id), + }); + if (rootDefinition) { + const parentReroutePipeline = await generateReroutePipeline({ + definition: rootDefinition, + }); + await upsertIngestPipeline({ + esClient: scopedClusterClient.asCurrentUser, + logger, + pipeline: parentReroutePipeline, + }); + } + await upsertDataStream({ + esClient: scopedClusterClient.asCurrentUser, + logger, + name: definition.id, + }); + await upsertInternalStream({ + scopedClusterClient, + definition, + }); + await rolloverDataStreamIfNecessary({ + esClient: scopedClusterClient.asCurrentUser, + name: definition.id, + logger, + }); +} diff --git a/x-pack/plugins/streams/server/plugin.ts b/x-pack/plugins/streams/server/plugin.ts new file mode 100644 index 0000000000000..ef070984803d5 --- /dev/null +++ b/x-pack/plugins/streams/server/plugin.ts @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { + CoreSetup, + CoreStart, + KibanaRequest, + Logger, + Plugin, + PluginConfigDescriptor, + PluginInitializerContext, +} from '@kbn/core/server'; +import { registerRoutes } from '@kbn/server-route-repository'; +import { StreamsConfig, configSchema, exposeToBrowserConfig } from '../common/config'; +import { StreamsRouteRepository } from './routes'; +import { RouteDependencies } from './routes/types'; +import { + StreamsPluginSetupDependencies, + StreamsPluginStartDependencies, + StreamsServer, +} from './types'; + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginSetup {} +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface StreamsPluginStart {} + +export const config: PluginConfigDescriptor = { + schema: configSchema, + exposeToBrowser: exposeToBrowserConfig, +}; + +export class StreamsPlugin + implements + Plugin< + StreamsPluginSetup, + StreamsPluginStart, + StreamsPluginSetupDependencies, + StreamsPluginStartDependencies + > +{ + public config: StreamsConfig; + public logger: Logger; + public server?: StreamsServer; + + constructor(context: PluginInitializerContext) { + this.config = context.config.get(); + this.logger = context.logger.get(); + } + + public setup(core: CoreSetup, plugins: StreamsPluginSetupDependencies): StreamsPluginSetup { + this.server = { + config: this.config, + logger: this.logger, + } as StreamsServer; + + registerRoutes({ + repository: StreamsRouteRepository, + dependencies: { + server: this.server, + getScopedClients: async ({ request }: { request: KibanaRequest }) => { + const [coreStart] = await core.getStartServices(); + const scopedClusterClient = coreStart.elasticsearch.client.asScoped(request); + const soClient = coreStart.savedObjects.getScopedClient(request); + return { scopedClusterClient, soClient }; + }, + }, + core, + logger: this.logger, + }); + + return {}; + } + + public start(core: CoreStart, plugins: StreamsPluginStartDependencies): StreamsPluginStart { + if (this.server) { + this.server.core = core; + this.server.isServerless = core.elasticsearch.getCapabilities().serverless; + this.server.security = plugins.security; + this.server.encryptedSavedObjects = plugins.encryptedSavedObjects; + this.server.taskManager = plugins.taskManager; + } + + return {}; + } + + public stop() {} +} diff --git a/x-pack/plugins/streams/server/routes/create_server_route.ts b/x-pack/plugins/streams/server/routes/create_server_route.ts new file mode 100644 index 0000000000000..94d85a71c82bb --- /dev/null +++ b/x-pack/plugins/streams/server/routes/create_server_route.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { createServerRouteFactory } from '@kbn/server-route-repository'; +import { StreamsRouteHandlerResources } from './types'; + +export const createServerRoute = createServerRouteFactory(); diff --git a/x-pack/plugins/streams/server/routes/index.ts b/x-pack/plugins/streams/server/routes/index.ts new file mode 100644 index 0000000000000..6fc734d3371b4 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/index.ts @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { deleteStreamRoute } from './streams/delete'; +import { editStreamRoute } from './streams/edit'; +import { enableStreamsRoute } from './streams/enable'; +import { forkStreamsRoute } from './streams/fork'; +import { listStreamsRoute } from './streams/list'; +import { readStreamRoute } from './streams/read'; +import { resyncStreamsRoute } from './streams/resync'; + +export const StreamsRouteRepository = { + ...enableStreamsRoute, + ...resyncStreamsRoute, + ...forkStreamsRoute, + ...readStreamRoute, + ...editStreamRoute, + ...deleteStreamRoute, + ...listStreamsRoute, +}; + +export type StreamsRouteRepository = typeof StreamsRouteRepository; diff --git a/x-pack/plugins/streams/server/routes/streams/delete.ts b/x-pack/plugins/streams/server/routes/streams/delete.ts new file mode 100644 index 0000000000000..3820975dbe16a --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/delete.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { syncStream, readStream, deleteStreamObjects } from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { getParentId } from '../../lib/streams/helpers/hierarchy'; + +export const deleteStreamRoute = createServerRoute({ + endpoint: 'DELETE /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + + const parentId = getParentId(params.path.id); + if (!parentId) { + throw new MalformedStreamId('Cannot delete root stream'); + } + + await updateParentStream(scopedClusterClient, params.path.id, parentId, logger); + + await deleteStream(scopedClusterClient, params.path.id, logger); + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +async function deleteStream(scopedClusterClient: IScopedClusterClient, id: string, logger: Logger) { + try { + const { definition } = await readStream({ scopedClusterClient, id }); + for (const child of definition.children) { + await deleteStream(scopedClusterClient, child.id, logger); + } + await deleteStreamObjects({ scopedClusterClient, id, logger }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + logger.debug(`Stream definition for ${id} not found.`); + } else { + throw e; + } + } +} + +async function updateParentStream( + scopedClusterClient: IScopedClusterClient, + id: string, + parentId: string, + logger: Logger +) { + const { definition: parentDefinition } = await readStream({ + scopedClusterClient, + id: parentId, + }); + + parentDefinition.children = parentDefinition.children.filter((child) => child.id !== id); + + await syncStream({ + scopedClusterClient, + definition: parentDefinition, + logger, + }); + return parentDefinition; +} diff --git a/x-pack/plugins/streams/server/routes/streams/edit.ts b/x-pack/plugins/streams/server/routes/streams/edit.ts new file mode 100644 index 0000000000000..b82b4d54044da --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/edit.ts @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { Logger } from '@kbn/logging'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { StreamDefinition, streamWithoutIdDefinitonSchema } from '../../../common/types'; +import { + syncStream, + readStream, + checkStreamExists, + validateAncestorFields, + validateDescendantFields, +} from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { getParentId } from '../../lib/streams/helpers/hierarchy'; +import { MalformedChildren } from '../../lib/streams/errors/malformed_children'; + +export const editStreamRoute = createServerRoute({ + endpoint: 'PUT /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + body: streamWithoutIdDefinitonSchema, + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + + await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children); + await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields); + await validateDescendantFields(scopedClusterClient, params.path.id, params.body.fields); + + const parentId = getParentId(params.path.id); + let parentDefinition: StreamDefinition | undefined; + + if (parentId) { + parentDefinition = await updateParentStream( + scopedClusterClient, + parentId, + params.path.id, + logger + ); + } + const streamDefinition = { ...params.body }; + + await syncStream({ + scopedClusterClient, + definition: { ...streamDefinition, id: params.path.id }, + rootDefinition: parentDefinition, + logger, + }); + + for (const child of streamDefinition.children) { + const streamExists = await checkStreamExists({ + scopedClusterClient, + id: child.id, + }); + if (streamExists) { + continue; + } + // create empty streams for each child if they don't exist + const childDefinition = { + id: child.id, + children: [], + fields: [], + processing: [], + }; + + await syncStream({ + scopedClusterClient, + definition: childDefinition, + logger, + }); + } + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +async function updateParentStream( + scopedClusterClient: IScopedClusterClient, + parentId: string, + id: string, + logger: Logger +) { + const { definition: parentDefinition } = await readStream({ + scopedClusterClient, + id: parentId, + }); + + if (!parentDefinition.children.some((child) => child.id === id)) { + // add the child to the parent stream with an empty condition for now + parentDefinition.children.push({ + id, + condition: undefined, + }); + + await syncStream({ + scopedClusterClient, + definition: parentDefinition, + logger, + }); + } + return parentDefinition; +} + +async function validateStreamChildren( + scopedClusterClient: IScopedClusterClient, + id: string, + children: Array<{ id: string }> +) { + try { + const { definition: oldDefinition } = await readStream({ + scopedClusterClient, + id, + }); + const oldChildren = oldDefinition.children.map((child) => child.id); + const newChildren = new Set(children.map((child) => child.id)); + if (oldChildren.some((child) => !newChildren.has(child))) { + throw new MalformedChildren( + 'Cannot remove children from a stream, please delete the stream instead' + ); + } + } catch (e) { + // Ignore if the stream does not exist, but re-throw if it's another error + if (!(e instanceof DefinitionNotFound)) { + throw e; + } + } +} diff --git a/x-pack/plugins/streams/server/routes/streams/enable.ts b/x-pack/plugins/streams/server/routes/streams/enable.ts new file mode 100644 index 0000000000000..27d8929b28e50 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/enable.ts @@ -0,0 +1,48 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { SecurityException } from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { syncStream } from '../../lib/streams/stream_crud'; +import { rootStreamDefinition } from '../../lib/streams/root_stream_definition'; +import { createStreamsIndex } from '../../lib/streams/internal_stream_mapping'; + +export const enableStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/_enable 2023-10-31', + params: z.object({}), + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + handler: async ({ request, response, logger, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + await createStreamsIndex(scopedClusterClient); + await syncStream({ + scopedClusterClient, + definition: rootStreamDefinition, + logger, + }); + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof SecurityException) { + return response.customError({ body: e, statusCode: 400 }); + } + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/fork.ts b/x-pack/plugins/streams/server/routes/streams/fork.ts new file mode 100644 index 0000000000000..44f4052878003 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/fork.ts @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { + DefinitionNotFound, + ForkConditionMissing, + IndexTemplateNotFound, + SecurityException, +} from '../../lib/streams/errors'; +import { createServerRoute } from '../create_server_route'; +import { conditionSchema, streamDefinitonWithoutChildrenSchema } from '../../../common/types'; +import { syncStream, readStream, validateAncestorFields } from '../../lib/streams/stream_crud'; +import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; +import { isChildOf } from '../../lib/streams/helpers/hierarchy'; + +export const forkStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/{id}/_fork 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ + id: z.string(), + }), + body: z.object({ stream: streamDefinitonWithoutChildrenSchema, condition: conditionSchema }), + }), + handler: async ({ response, params, logger, request, getScopedClients }) => { + try { + if (!params.body.condition) { + throw new ForkConditionMissing('You must provide a condition to fork a stream'); + } + + const { scopedClusterClient } = await getScopedClients({ request }); + + const { definition: rootDefinition } = await readStream({ + scopedClusterClient, + id: params.path.id, + }); + + const childDefinition = { ...params.body.stream, children: [] }; + + // check whether root stream has a child of the given name already + if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { + throw new MalformedStreamId( + `The stream with ID (${params.body.stream.id}) already exists as a child of the parent stream` + ); + } + + if (!isChildOf(rootDefinition, childDefinition)) { + throw new MalformedStreamId( + `The ID (${params.body.stream.id}) from the new stream must start with the parent's id (${rootDefinition.id}), followed by a dot and a name` + ); + } + + await validateAncestorFields( + scopedClusterClient, + params.body.stream.id, + params.body.stream.fields + ); + + rootDefinition.children.push({ + id: params.body.stream.id, + condition: params.body.condition, + }); + + await syncStream({ + scopedClusterClient, + definition: rootDefinition, + rootDefinition, + logger, + }); + + await syncStream({ + scopedClusterClient, + definition: childDefinition, + rootDefinition, + logger, + }); + + return response.ok({ body: { acknowledged: true } }); + } catch (e) { + if (e instanceof IndexTemplateNotFound || e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + if ( + e instanceof SecurityException || + e instanceof ForkConditionMissing || + e instanceof MalformedStreamId + ) { + return response.customError({ body: e, statusCode: 400 }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/list.ts b/x-pack/plugins/streams/server/routes/streams/list.ts new file mode 100644 index 0000000000000..2e4f13a89bb41 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/list.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { DefinitionNotFound } from '../../lib/streams/errors'; +import { listStreams } from '../../lib/streams/stream_crud'; + +export const listStreamsRoute = createServerRoute({ + endpoint: 'GET /api/streams 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({}), + handler: async ({ response, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + const definitions = await listStreams({ scopedClusterClient }); + + const trees = asTrees(definitions); + + return response.ok({ body: { streams: trees } }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); + +interface ListStreamDefinition { + id: string; + children: ListStreamDefinition[]; +} + +function asTrees(definitions: Array<{ id: string[] }>) { + const trees: ListStreamDefinition[] = []; + definitions.forEach((definition) => { + const path = definition.id[0].split('.'); + let currentTree = trees; + path.forEach((_id, index) => { + const partialPath = path.slice(0, index + 1).join('.'); + const existingNode = currentTree.find((node) => node.id === partialPath); + if (existingNode) { + currentTree = existingNode.children; + } else { + const newNode = { id: partialPath, children: [] }; + currentTree.push(newNode); + currentTree = newNode.children; + } + }); + }); + return trees; +} diff --git a/x-pack/plugins/streams/server/routes/streams/read.ts b/x-pack/plugins/streams/server/routes/streams/read.ts new file mode 100644 index 0000000000000..5ea2aaf5f2542 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/read.ts @@ -0,0 +1,60 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { DefinitionNotFound } from '../../lib/streams/errors'; +import { readAncestors, readStream } from '../../lib/streams/stream_crud'; + +export const readStreamRoute = createServerRoute({ + endpoint: 'GET /api/streams/{id} 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({ + path: z.object({ id: z.string() }), + }), + handler: async ({ response, params, request, getScopedClients }) => { + try { + const { scopedClusterClient } = await getScopedClients({ request }); + const streamEntity = await readStream({ + scopedClusterClient, + id: params.path.id, + }); + + const ancestors = await readAncestors({ + id: streamEntity.definition.id, + scopedClusterClient, + }); + + const body = { + ...streamEntity.definition, + inheritedFields: ancestors.flatMap(({ definition: { id, fields } }) => + fields.map((field) => ({ ...field, from: id })) + ), + }; + + return response.ok({ body }); + } catch (e) { + if (e instanceof DefinitionNotFound) { + return response.notFound({ body: e }); + } + + return response.customError({ body: e, statusCode: 500 }); + } + }, +}); diff --git a/x-pack/plugins/streams/server/routes/streams/resync.ts b/x-pack/plugins/streams/server/routes/streams/resync.ts new file mode 100644 index 0000000000000..2365252ab00e6 --- /dev/null +++ b/x-pack/plugins/streams/server/routes/streams/resync.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { createServerRoute } from '../create_server_route'; +import { syncStream, readStream, listStreams } from '../../lib/streams/stream_crud'; + +export const resyncStreamsRoute = createServerRoute({ + endpoint: 'POST /api/streams/_resync 2023-10-31', + options: { + access: 'public', + availability: { + stability: 'experimental', + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + }, + params: z.object({}), + handler: async ({ response, logger, request, getScopedClients }) => { + const { scopedClusterClient } = await getScopedClients({ request }); + + const streams = await listStreams({ scopedClusterClient }); + + for (const stream of streams) { + const { definition } = await readStream({ + scopedClusterClient, + id: stream.id[0], + }); + await syncStream({ + scopedClusterClient, + definition, + logger, + }); + } + + return response.ok({}); + }, +}); diff --git a/x-pack/plugins/streams/server/routes/types.ts b/x-pack/plugins/streams/server/routes/types.ts new file mode 100644 index 0000000000000..d547d56c088cd --- /dev/null +++ b/x-pack/plugins/streams/server/routes/types.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { KibanaRequest } from '@kbn/core-http-server'; +import { DefaultRouteHandlerResources } from '@kbn/server-route-repository'; +import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; +import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server'; +import { StreamsServer } from '../types'; + +export interface RouteDependencies { + server: StreamsServer; + getScopedClients: ({ request }: { request: KibanaRequest }) => Promise<{ + scopedClusterClient: IScopedClusterClient; + soClient: SavedObjectsClientContract; + }>; +} + +export type StreamsRouteHandlerResources = RouteDependencies & DefaultRouteHandlerResources; diff --git a/x-pack/plugins/streams/server/types.ts b/x-pack/plugins/streams/server/types.ts new file mode 100644 index 0000000000000..f119faa0ed010 --- /dev/null +++ b/x-pack/plugins/streams/server/types.ts @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { CoreStart, ElasticsearchClient, Logger } from '@kbn/core/server'; +import { SecurityPluginStart } from '@kbn/security-plugin/server'; +import { + EncryptedSavedObjectsPluginSetup, + EncryptedSavedObjectsPluginStart, +} from '@kbn/encrypted-saved-objects-plugin/server'; +import { LicensingPluginStart } from '@kbn/licensing-plugin/server'; +import { + TaskManagerSetupContract, + TaskManagerStartContract, +} from '@kbn/task-manager-plugin/server'; +import { StreamsConfig } from '../common/config'; + +export interface StreamsServer { + core: CoreStart; + config: StreamsConfig; + logger: Logger; + security: SecurityPluginStart; + encryptedSavedObjects: EncryptedSavedObjectsPluginStart; + isServerless: boolean; + taskManager: TaskManagerStartContract; +} + +export interface ElasticsearchAccessorOptions { + elasticsearchClient: ElasticsearchClient; +} + +export interface StreamsPluginSetupDependencies { + encryptedSavedObjects: EncryptedSavedObjectsPluginSetup; + taskManager: TaskManagerSetupContract; +} + +export interface StreamsPluginStartDependencies { + security: SecurityPluginStart; + encryptedSavedObjects: EncryptedSavedObjectsPluginStart; + licensing: LicensingPluginStart; + taskManager: TaskManagerStartContract; +} diff --git a/x-pack/plugins/streams/tsconfig.json b/x-pack/plugins/streams/tsconfig.json new file mode 100644 index 0000000000000..c2fde35f9ca22 --- /dev/null +++ b/x-pack/plugins/streams/tsconfig.json @@ -0,0 +1,31 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "outDir": "target/types" + }, + "include": [ + "../../../typings/**/*", + "common/**/*", + "server/**/*", + "public/**/*", + "types/**/*" + ], + "exclude": [ + "target/**/*" + ], + "kbn_references": [ + "@kbn/config-schema", + "@kbn/core", + "@kbn/logging", + "@kbn/core-plugins-server", + "@kbn/core-http-server", + "@kbn/security-plugin", + "@kbn/core-saved-objects-api-server", + "@kbn/core-elasticsearch-server", + "@kbn/task-manager-plugin", + "@kbn/server-route-repository", + "@kbn/zod", + "@kbn/encrypted-saved-objects-plugin", + "@kbn/licensing-plugin", + ] +} diff --git a/yarn.lock b/yarn.lock index 8795abae4cc38..cdc813bf0a1d3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7464,6 +7464,10 @@ version "0.0.0" uid "" +"@kbn/streams-plugin@link:x-pack/plugins/streams": + version "0.0.0" + uid "" + "@kbn/synthetics-e2e@link:x-pack/plugins/observability_solution/synthetics/e2e": version "0.0.0" uid ""