diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts index e8bad8893393f..2c753804275fb 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/index.ts @@ -47,6 +47,12 @@ export { type RoutingDefinition, routingDefinitionListSchema } from './src/model export { type ContentPack, contentPackSchema } from './src/content'; export { isRootStreamDefinition } from './src/helpers/is_root'; +export { + keepFields, + namespacePrefixes, + isNamespacedEcsField, + getRegularEcsField, +} from './src/helpers/namespaced_ecs'; export { getAdvancedParameters } from './src/helpers/get_advanced_parameters'; export { getInheritedFieldsFromAncestors } from './src/helpers/get_inherited_fields_from_ancestors'; @@ -73,6 +79,7 @@ export { type NamedFieldDefinitionConfig, type FieldDefinitionConfig, type InheritedFieldDefinitionConfig, + type InheritedFieldDefinition, type FieldDefinitionConfigAdvancedParameters, fieldDefinitionConfigSchema, namedFieldDefinitionConfigSchema, diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts index 8cc42033e157a..1cf80b0ca1f68 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/fields/index.ts @@ -62,6 +62,7 @@ export const fieldDefinitionSchema: z.Schema = z.record( export type InheritedFieldDefinitionConfig = FieldDefinitionConfig & { from: string; + alias_for?: string; }; export interface InheritedFieldDefinition { @@ -70,7 +71,10 @@ export interface InheritedFieldDefinition { export const inheritedFieldDefinitionSchema: z.Schema = z.record( z.string(), - z.intersection(fieldDefinitionConfigSchema, z.object({ from: NonEmptyString })) + z.intersection( + fieldDefinitionConfigSchema, + z.object({ from: NonEmptyString, alias_for: z.optional(NonEmptyString) }) + ) ); export type NamedFieldDefinitionConfig = FieldDefinitionConfig & { diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/namespaced_ecs.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/namespaced_ecs.ts new file mode 100644 index 0000000000000..752ec62ddf0d3 --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/helpers/namespaced_ecs.ts @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const keepFields: string[] = [ + '@timestamp', + 'observed_timestamp', + 'trace_id', + 'span_id', + 'severity_text', + 'body', + 'severity_number', + 'event_name', + 'dropped_attributes_count', + 'scope', + 'body.text', + 'body.structured', + 'resource.schema_url', + 'resource.dropped_attributes_count', +]; + +export const aliases: Record = { + trace_id: 'trace.id', + span_id: 'span.id', + severity_text: 'log.level', + 'body.text': 'message', +}; + +export const namespacePrefixes = [ + 'body.structured.', + 'attributes.', + 'scope.attributes.', + 'resource.attributes.', +]; + +export function getRegularEcsField(field: string): string { + // check whether it starts with a namespace prefix + for (const prefix of namespacePrefixes) { + if (field.startsWith(prefix)) { + return field.slice(prefix.length); + } + } + // check aliases + if (aliases[field]) { + return aliases[field]; + } + return field; +} + +export function isNamespacedEcsField(field: string): boolean { + return namespacePrefixes.some((prefix) => field.startsWith(prefix)) || keepFields.includes(field); +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts new file mode 100644 index 0000000000000..552196892503c --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.test.ts @@ -0,0 +1,188 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { Streams } from '@kbn/streams-schema'; +import { generateLayer } from './generate_layer'; + +describe('generateLayer', () => { + const definition: Streams.WiredStream.Definition = { + name: 'logs.abc', + description: '', + ingest: { + processing: [], + wired: { + routing: [], + fields: { + '@timestamp': { type: 'date', format: 'strict_date_optional_time' }, + message: { type: 'match_only_text' }, + 'attributes.myfield': { type: 'keyword' }, + }, + }, + lifecycle: { + // simulate DSL lifecycle + dsl: { data_retention: '30d' }, + }, + }, + }; + + it('should generate mappings with proper handling of fields and aliases', () => { + const result = generateLayer('logs.abc', definition, false); + expect(result).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "description": "Default settings for the logs.abc stream", + "managed": true, + }, + "name": "logs.abc@stream.layer", + "template": Object { + "lifecycle": Object { + "data_retention": "30d", + "enabled": true, + }, + "mappings": Object { + "dynamic": false, + "properties": Object { + "@timestamp": Object { + "format": "strict_date_optional_time", + "ignore_malformed": false, + "type": "date", + }, + "attributes.myfield": Object { + "type": "keyword", + }, + "message": Object { + "type": "match_only_text", + }, + "myfield": Object { + "path": "attributes.myfield", + "type": "alias", + }, + }, + }, + "settings": Object { + "index.lifecycle.name": undefined, + "index.lifecycle.prefer_ilm": false, + }, + }, + "version": 1, + } + `); + }); + + it('should generate mappings for root stream', () => { + const result = generateLayer('logs', { ...definition, name: 'logs' }, true); + expect(result).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "description": "Default settings for the logs stream", + "managed": true, + }, + "name": "logs@stream.layer", + "template": Object { + "lifecycle": Object { + "data_retention": "30d", + }, + "mappings": Object { + "dynamic": false, + "properties": Object { + "@timestamp": Object { + "format": "strict_date_optional_time", + "ignore_malformed": false, + "type": "date", + }, + "attributes": Object { + "subobjects": false, + "type": "object", + }, + "attributes.myfield": Object { + "type": "keyword", + }, + "body": Object { + "properties": Object { + "structured": Object { + "type": "flattened", + }, + "text": Object { + "type": "match_only_text", + }, + }, + "type": "object", + }, + "log.level": Object { + "path": "severity_text", + "type": "alias", + }, + "message": Object { + "type": "match_only_text", + }, + "myfield": Object { + "path": "attributes.myfield", + "type": "alias", + }, + "resource": Object { + "properties": Object { + "attributes": Object { + "subobjects": false, + "type": "object", + }, + "dropped_attributes_count": Object { + "type": "long", + }, + "schema_url": Object { + "ignore_above": 1024, + "type": "keyword", + }, + }, + "type": "object", + }, + "scope": Object { + "properties": Object { + "attributes": Object { + "subobjects": false, + "type": "object", + }, + }, + "type": "object", + }, + "span.id": Object { + "path": "span_id", + "type": "alias", + }, + "trace.id": Object { + "path": "trace_id", + "type": "alias", + }, + }, + }, + "settings": Object { + "index": Object { + "codec": "best_compression", + "mapping": Object { + "ignore_malformed": true, + "total_fields": Object { + "ignore_dynamic_beyond_limit": true, + }, + }, + "mode": "logsdb", + "sort": Object { + "field": Array [ + "resource.attributes.host.name", + "@timestamp", + ], + "order": Array [ + "asc", + "desc", + ], + }, + }, + }, + }, + "version": 1, + } + `); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts index e1535c8b677a2..fe9002d366369 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/generate_layer.ts @@ -16,10 +16,12 @@ import { isDslLifecycle, isIlmLifecycle, isRoot, + namespacePrefixes, } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { logsSettings } from './logs_layer'; import { getComponentTemplateName } from './name'; +import { baseMappings } from './logs_layer'; export function generateLayer( name: string, @@ -50,6 +52,14 @@ export function generateLayer( } properties[field] = property; + const matchingPrefix = namespacePrefixes.find((prefix) => field.startsWith(prefix)); + if (matchingPrefix) { + const aliasName = field.substring(matchingPrefix.length); + properties[aliasName] = { + type: 'alias', + path: field, + }; + } }); return { @@ -58,9 +68,13 @@ export function generateLayer( lifecycle: getTemplateLifecycle(definition, isServerless), settings: getTemplateSettings(definition, isServerless), mappings: { - subobjects: false, dynamic: false, - properties, + properties: isRoot(name) + ? { + ...baseMappings, + ...properties, + } + : properties, }, }, version: ASSET_VERSION, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.test.ts new file mode 100644 index 0000000000000..174bbf429af22 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.test.ts @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { InheritedFieldDefinition, Streams } from '@kbn/streams-schema'; +import { addAliasesForNamespacedFields, baseMappings, baseFields } from './logs_layer'; + +describe('logs_layer', () => { + describe('addAliasesForNamespacedFields', () => { + let mockStreamDefinition: Streams.WiredStream.Definition; + let mockInheritedFields: InheritedFieldDefinition; + + beforeEach(() => { + // Mock stream definition + mockStreamDefinition = { + name: 'test-stream', + ingest: { + wired: { + fields: { + 'resource.attributes.host.name': { type: 'keyword' }, + 'attributes.transaction.id': { type: 'keyword' }, + 'regular.field': { type: 'keyword' }, + }, + }, + }, + } as unknown as Streams.WiredStream.Definition; + + // Mock inherited fields + mockInheritedFields = { + 'resource.attributes.service.name': { + type: 'keyword', + from: 'parent-stream', + }, + 'body.structured.data': { + type: 'keyword', + from: 'grandparent-stream', + }, + '@timestamp': { + type: 'date', + from: 'system', + }, + }; + }); + + it('should create aliases for all namespaced fields', () => { + const result = addAliasesForNamespacedFields(mockStreamDefinition, { + ...mockInheritedFields, + }); + + // Aliases for inherited fields + expect(result['service.name']).toEqual({ + type: 'keyword', + from: 'parent-stream', + alias_for: 'resource.attributes.service.name', + }); + + expect(result.data).toEqual({ + type: 'keyword', + from: 'grandparent-stream', + alias_for: 'body.structured.data', + }); + + // Aliases for stream fields + expect(result['host.name']).toEqual({ + type: 'keyword', + from: 'test-stream', + alias_for: 'resource.attributes.host.name', + }); + + expect(result['transaction.id']).toEqual({ + type: 'keyword', + from: 'test-stream', + alias_for: 'attributes.transaction.id', + }); + + // Regular fields should not have aliases + expect(result['regular.field']).toBeUndefined(); + }); + + it('should include aliases from base mappings', () => { + // Use real base mappings and base fields + const result = addAliasesForNamespacedFields(mockStreamDefinition, { + ...mockInheritedFields, + }); + + // Check for base alias mappings + Object.entries(baseMappings).forEach(([key, mapping]) => { + if (mapping.type === 'alias' && mapping.path) { + expect(result[key]).toEqual({ + type: baseFields[mapping.path].type, + alias_for: mapping.path, + from: 'logs', + }); + } + }); + + // Verify specific examples + expect(result['log.level']).toEqual({ + type: 'keyword', + alias_for: 'severity_text', + from: 'logs', + }); + + expect(result.message).toEqual({ + type: 'match_only_text', + alias_for: 'body.text', + from: 'logs', + }); + }); + + it('should handle empty fields', () => { + const emptyStreamDefinition = { + name: 'empty-stream', + ingest: { + wired: { + fields: {}, + }, + }, + } as unknown as Streams.WiredStream.Definition; + + const result = addAliasesForNamespacedFields(emptyStreamDefinition, {}); + + // Should only contain base aliases + const baseAliasCount = Object.values(baseMappings).filter( + (mapping) => mapping.type === 'alias' + ).length; + expect(Object.keys(result).length).toBe(baseAliasCount); + }); + + it('should handle conflicting aliases', () => { + // Create a scenario where two different namespaced fields would map to the same alias + const conflictingFields: InheritedFieldDefinition = { + 'resource.attributes.user': { + type: 'keyword', + from: 'service-a', + }, + 'attributes.user': { + type: 'keyword', + from: 'service-b', + }, + }; + + // The resource alias should overwrite the attributes due to the order of namespacePrefixes + const result = addAliasesForNamespacedFields( + { + ...mockStreamDefinition, + ingest: { + wired: { + fields: {}, + routing: [], + }, + lifecycle: { inherit: {} }, + processing: [], + }, + }, + conflictingFields + ); + + // 'user' should point to 'resource.attributes.user' as 'resource.attributes.' comes later in the namespacePrefixes array + expect(result.user).toEqual({ + type: 'keyword', + from: 'service-a', + alias_for: 'resource.attributes.user', + }); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts index f09bc39fe31fe..b74826dfef17b 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/component_templates/logs_layer.ts @@ -5,12 +5,22 @@ * 2.0. */ -import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types'; +import { IndicesIndexSettings, MappingProperty } from '@elastic/elasticsearch/lib/api/types'; +import { + FieldDefinition, + InheritedFieldDefinition, + Streams, + namespacePrefixes, +} from '@kbn/streams-schema'; export const logsSettings: IndicesIndexSettings = { index: { mode: 'logsdb', codec: 'best_compression', + sort: { + field: ['resource.attributes.host.name', '@timestamp'], + order: ['asc', 'desc'], + }, mapping: { total_fields: { ignore_dynamic_beyond_limit: true, @@ -19,3 +29,198 @@ export const logsSettings: IndicesIndexSettings = { }, }, }; + +export const baseFields: FieldDefinition = { + '@timestamp': { + type: 'date', + }, + 'stream.name': { + type: 'system', + }, + 'scope.dropped_attributes_count': { + type: 'long', + }, + dropped_attributes_count: { + type: 'long', + }, + 'resource.dropped_attributes_count': { + type: 'long', + }, + 'resource.schema_url': { + type: 'keyword', + }, + 'scope.name': { + type: 'keyword', + }, + 'scope.schema_url': { + type: 'keyword', + }, + 'scope.version': { + type: 'keyword', + }, + observed_timestamp: { + type: 'date', + }, + trace_id: { + type: 'keyword', + }, + span_id: { + type: 'keyword', + }, + event_name: { + type: 'keyword', + }, + severity_text: { + type: 'keyword', + }, + 'body.text': { + type: 'match_only_text', + }, + severity_number: { + type: 'long', + }, + 'resource.attributes.host.name': { + type: 'keyword', + }, +}; + +export const baseMappings: Record = { + body: { + type: 'object', + properties: { + structured: { + type: 'flattened', + }, + text: { + type: 'match_only_text', + }, + }, + }, + attributes: { + type: 'object', + subobjects: false, + }, + resource: { + type: 'object', + properties: { + dropped_attributes_count: { + type: 'long', + }, + schema_url: { + ignore_above: 1024, + type: 'keyword', + }, + attributes: { + type: 'object', + subobjects: false, + }, + }, + }, + scope: { + type: 'object', + properties: { + attributes: { + type: 'object', + subobjects: false, + }, + }, + }, + 'span.id': { + path: 'span_id', + type: 'alias', + }, + message: { + path: 'body.text', + type: 'alias', + }, + 'trace.id': { + path: 'trace_id', + type: 'alias', + }, + 'log.level': { + path: 'severity_text', + type: 'alias', + }, +}; + +/** + * Takes a map of fields and returns a sorted array of field names. + * The sorting sorts fields alphabetically, but puts fields with otelPrefixes at the end in the order of the + * prefixes array. + */ +function getSortedFields(fields: FieldDefinition) { + return Object.entries(fields).sort(([a], [b]) => { + const aPrefixIndex = namespacePrefixes.findIndex((prefix) => a.startsWith(prefix)); + const bPrefixIndex = namespacePrefixes.findIndex((prefix) => b.startsWith(prefix)); + + if (aPrefixIndex !== -1 && bPrefixIndex === -1) { + return 1; + } + if (aPrefixIndex === -1 && bPrefixIndex !== -1) { + return -1; + } + if (aPrefixIndex !== -1 && bPrefixIndex !== -1) { + return aPrefixIndex - bPrefixIndex; + } + return a.localeCompare(b); + }); +} + +const allNamespacesRegex = new RegExp(`^(${namespacePrefixes.join('|')})`); + +/** + * Helper function that creates aliases for fields with namespace prefixes + * @param fields - The fields to process + * @param fromSource - The source to set in the 'from' property of the alias + * @param targetCollection - Where to add the new aliases + */ +const createAliasesForNamespacedFields = ( + fields: FieldDefinition, + fromSource: string | ((key: string) => string), + targetCollection: InheritedFieldDefinition +) => { + getSortedFields(fields).forEach(([key, fieldDef]) => { + if (namespacePrefixes.some((prefix) => key.startsWith(prefix))) { + const aliasKey = key.replace(allNamespacesRegex, ''); + const from = typeof fromSource === 'function' ? fromSource(key) : fromSource; + + targetCollection[aliasKey] = { + ...fieldDef, + from, + alias_for: key, + }; + } + }); +}; + +export function addAliasesForNamespacedFields( + streamDefinition: Streams.WiredStream.Definition, + inheritedFields: InheritedFieldDefinition +) { + // Create aliases for inherited fields + createAliasesForNamespacedFields( + inheritedFields, + (key) => inheritedFields[key].from, + inheritedFields + ); + + // Create aliases for this stream's fields + createAliasesForNamespacedFields( + streamDefinition.ingest.wired.fields, + streamDefinition.name, + inheritedFields + ); + + // Add aliases defined in the base mappings + Object.entries(baseMappings).forEach(([key, fieldDef]) => { + if (fieldDef.type === 'alias') { + inheritedFields[key] = { + type: baseFields[fieldDef.path!].type, + alias_for: fieldDef.path, + from: 'logs', + }; + } + }); + + return inheritedFields; +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/namespaced_ecs.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/namespaced_ecs.ts deleted file mode 100644 index 1fec1c76430eb..0000000000000 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/namespaced_ecs.ts +++ /dev/null @@ -1,6 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts index da627f323fc6f..4c99a6572b0ba 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/helpers/validate_fields.ts @@ -5,8 +5,15 @@ * 2.0. */ -import { FieldDefinition, Streams, isRoot } from '@kbn/streams-schema'; +import { + FieldDefinition, + Streams, + isRoot, + keepFields, + namespacePrefixes, +} from '@kbn/streams-schema'; import { MalformedFieldsError } from '../errors/malformed_fields_error'; +import { baseMappings } from '../component_templates/logs_layer'; export function validateAncestorFields({ ancestors, @@ -17,8 +24,10 @@ export function validateAncestorFields({ }) { for (const ancestor of ancestors) { for (const fieldName in fields) { + if (!Object.hasOwn(fields, fieldName)) { + continue; + } if ( - Object.hasOwn(fields, fieldName) && Object.entries(ancestor.ingest.wired.fields).some( ([ancestorFieldName, attr]) => attr.type !== fields[fieldName].type && ancestorFieldName === fieldName @@ -28,6 +37,31 @@ export function validateAncestorFields({ `Field ${fieldName} is already defined with incompatible type in the parent stream ${ancestor.name}` ); } + if ( + !namespacePrefixes.some((prefix) => fieldName.startsWith(prefix)) && + !keepFields.includes(fieldName) + ) { + throw new MalformedFieldsError( + `Field ${fieldName} is not allowed to be defined as it doesn't match the namespaced ECS or OTel schema.` + ); + } + for (const prefix of namespacePrefixes) { + const prefixedName = `${prefix}${fieldName}`; + if ( + Object.hasOwn(fields, prefixedName) || + Object.hasOwn(ancestor.ingest.wired.fields, prefixedName) + ) { + throw new MalformedFieldsError( + `Field ${fieldName} is an automatic alias of ${prefixedName} because of otel compat mode` + ); + } + } + // check the otelMappings - they are aliases and are not allowed to have the same name as a field + if (fieldName in baseMappings) { + throw new MalformedFieldsError( + `Field ${fieldName} is an automatic alias of another field because of otel compat mode` + ); + } } } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts index 90f941657faf4..d3fe09d068b3f 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/ingest_pipelines/logs_default_pipeline.ts @@ -23,6 +23,97 @@ export const logsDefaultPipelineProcessors = [ { dot_expander: { field: '*', + ignore_failure: true, + }, + }, + { + // This is a placeholder for the ECS migration processor - once it exists on the Elasticsearch side, it can be removed here + // The exact behavior might slightly differ from the one in the ECS migration processor, but it is close enough for now. + script: { + lang: 'painless', + source: ` + if (ctx.resource?.attributes != null) return; + + // Initialize resource container. + ctx.resource = [:]; + ctx.resource.attributes = [:]; + + // Resource prefixes to look for + def resourcePrefixes = ["host", "cloud", "agent"]; + + // Process resource attributes based on prefixes + def keysToProcess = new ArrayList(ctx.keySet()); + for (def key : keysToProcess) { + // Skip special keys + if (key.startsWith("_") || key == "@timestamp" || key == "resource") continue; + + boolean isResourceField = false; + + // Check if the key exactly matches one of our resource prefixes + if (resourcePrefixes.contains(key)) { + isResourceField = true; + } else { + // Check if the key starts with one of our resource prefixes followed by a dot + for (def prefix : resourcePrefixes) { + if (key.startsWith(prefix + ".")) { + isResourceField = true; + break; + } + } + } + + if (isResourceField && ctx[key] != null) { + ctx.resource.attributes[key] = ctx[key]; + ctx.remove(key); + } + } + + // Process the "message" field. + if (ctx.message != null) { + ctx.body = [:]; + ctx.body.text = ctx.message; + ctx.remove("message"); + } + + // Process "log.level" field. + if (ctx.log?.level != null) { + ctx.severity_text = ctx.log.level; + ctx.log.remove("level"); + } + + // Collect any remaining keys into ctx.attributes (except reserved ones) and remove them. + ctx.attributes = [:]; + def keysToRemove = []; + for (entry in ctx.entrySet()) { + if (entry.getKey() != "@timestamp" && + entry.getKey() != "resource" && + !entry.getKey().startsWith("_") && + entry.getKey() != "severity_text" && + entry.getKey() != "attributes" && + entry.getKey() != "body" + ) { + ctx.attributes[entry.getKey()] = entry.getValue(); + keysToRemove.add(entry.getKey()); + } + } + for (key in keysToRemove) { + ctx.remove(key); + } + `, + }, + }, + { + dot_expander: { + path: 'resource.attributes', + field: '*', + ignore_failure: true, + }, + }, + { + dot_expander: { + path: 'attributes', + field: '*', + ignore_failure: true, }, }, ]; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts index a8d004c50558b..15dba4bf7d8bb 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/root_stream_definition.ts @@ -6,6 +6,7 @@ */ import { Streams, getSegments } from '@kbn/streams-schema'; +import { baseFields } from './component_templates/logs_layer'; export const LOGS_ROOT_STREAM_NAME = 'logs'; @@ -18,21 +19,7 @@ export const rootStreamDefinition: Streams.WiredStream.Definition = { wired: { routing: [], fields: { - '@timestamp': { - type: 'date', - }, - message: { - type: 'match_only_text', - }, - 'host.name': { - type: 'keyword', - }, - 'log.level': { - type: 'keyword', - }, - 'stream.name': { - type: 'system', - }, + ...baseFields, }, }, }, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts index a9397ab91440c..897bf295582b2 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/streams/processing/simulation_handler.ts @@ -30,6 +30,7 @@ import { NamedFieldDefinitionConfig, FieldDefinitionConfig, InheritedFieldDefinitionConfig, + isNamespacedEcsField, FieldDefinition, Streams, } from '@kbn/streams-schema'; @@ -79,6 +80,10 @@ export type SimulationError = BaseSimulationError & type: 'non_additive_processor_failure'; processor_id: string; } + | { + type: 'non_namespaced_fields_failure'; + processor_id: string; + } | { type: 'reserved_field_failure'; processor_id: string; @@ -145,7 +150,10 @@ export const simulateProcessing = async ({ streamsClient, }: SimulateProcessingDeps) => { /* 0. Retrieve required data to prepare the simulation */ - const streamIndex = await getStreamIndex(scopedClusterClient, streamsClient, params.path.name); + const [stream, streamIndex] = await Promise.all([ + streamsClient.getStream(params.path.name), + getStreamIndex(scopedClusterClient, streamsClient, params.path.name), + ]); /* 1. Prepare data for either simulation types (ingest, pipeline), prepare simulation body for the mandatory pipeline simulation */ const simulationData = prepareSimulationData(params); @@ -176,6 +184,7 @@ export const simulateProcessing = async ({ ingestSimulationResult.simulation, simulationData.docs, params.body.processing, + Streams.WiredStream.Definition.is(stream), streamFields ); @@ -231,16 +240,33 @@ const prepareSimulationProcessors = ( } as ProcessorDefinition; }); - const dotExpanderProcessor: Pick = { - dot_expander: { - field: '*', - override: true, + const dotExpanderProcessors: Array> = [ + { + dot_expander: { + field: '*', + ignore_failure: true, + override: true, + }, }, - }; + { + dot_expander: { + path: 'resource.attributes', + field: '*', + ignore_failure: true, + }, + }, + { + dot_expander: { + path: 'attributes', + field: '*', + ignore_failure: true, + }, + }, + ]; const formattedProcessors = formatToIngestProcessors(processors); - return [dotExpanderProcessor, ...formattedProcessors]; + return [...dotExpanderProcessors, ...formattedProcessors]; }; const prepareSimulationData = (params: ProcessingSimulationParams) => { @@ -392,6 +418,7 @@ const computePipelineSimulationResult = ( ingestSimulationResult: SimulateIngestResponse, sampleDocs: Array<{ _source: FlattenRecord }>, processing: ProcessorDefinitionWithId[], + isWiredStream: boolean, streamFields: FieldDefinition ): { docReports: SimulationDocReport[]; @@ -416,6 +443,7 @@ const computePipelineSimulationResult = ( const diff = computeSimulationDocDiff( pipelineDocResult, sampleDocs[id]._source, + isWiredStream, forbiddenFields ); @@ -509,8 +537,8 @@ const getDocumentStatus = ( if (ingestDocErrors.some((error) => error.type === 'field_mapping_failure')) { return 'failed'; } - // Remove the always present base processor for dot expander - const processorResults = doc.processor_results.slice(1); + // Remove the always present base processor for dot expanders + const processorResults = doc.processor_results.slice(3); if (processorResults.every(isSkippedProcessor)) { return 'skipped'; @@ -535,7 +563,7 @@ const getLastDoc = ( const status = getDocumentStatus(docResult, ingestDocErrors); const lastDocSource = docResult.processor_results - .slice(1) // Remove the always present base processor for dot expander + .slice(3) // Remove the always present base processors for dot expander .filter((proc) => !isSkippedProcessor(proc)) .at(-1)?.doc?._source ?? sample; @@ -560,6 +588,7 @@ const getLastDoc = ( const computeSimulationDocDiff = ( docResult: SuccessfulPipelineSimulateDocumentResult, sample: FlattenRecord, + isWiredStream: boolean, forbiddenFields: string[] ) => { // Keep only the successful processors defined from the user, skipping the on_failure processors from the simulation @@ -605,6 +634,16 @@ const computeSimulationDocDiff = ( const originalUpdatedFields = updatedFields .filter((field) => field in sample && !forbiddenFields.includes(field)) .sort(); + if (isWiredStream) { + const nonNamespacedFields = addedFields.filter((field) => !isNamespacedEcsField(field)); + if (!isEmpty(nonNamespacedFields)) { + diffResult.errors.push({ + processor_id: nextDoc.processor_id, + type: 'non_namespaced_fields_failure', + message: `The fields generated by the processor are not namespaced ECS fields: [${nonNamespacedFields.join()}]`, + }); + } + } if (forbiddenFields.some((field) => updatedFields.includes(field))) { diffResult.errors.push({ processor_id: nextDoc.processor_id, diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts index 2fe380e0fced2..c7ef4850618bf 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts @@ -18,6 +18,7 @@ import { getDataStreamLifecycle, getUnmanagedElasticsearchAssets, } from '../../../lib/streams/stream_crud'; +import { addAliasesForNamespacedFields } from '../../../lib/streams/component_templates/logs_layer'; import { DashboardLink } from '../../../../common/assets'; import { ASSET_TYPE } from '../../../lib/streams/assets/fields'; @@ -85,13 +86,18 @@ export async function readStream({ } satisfies Streams.UnwiredStream.GetResponse; } + const inheritedFields = addAliasesForNamespacedFields( + streamDefinition, + getInheritedFieldsFromAncestors(ancestors) + ); + const body: Streams.WiredStream.GetResponse = { stream: streamDefinition, dashboards, privileges, queries, effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors), - inherited_fields: getInheritedFieldsFromAncestors(ancestors), + inherited_fields: inheritedFields, }; return body; diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_type.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_type.tsx index 467420e90526a..cf4f77ef3279b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_type.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/field_type.tsx @@ -8,9 +8,22 @@ import React from 'react'; import { FieldDefinitionConfig } from '@kbn/streams-schema'; import { FieldNameWithIcon } from '@kbn/react-field'; +import { i18n } from '@kbn/i18n'; import { FIELD_TYPE_MAP } from './constants'; -export const FieldType = ({ type }: { type: FieldDefinitionConfig['type'] }) => { +export const FieldType = ({ + type, + aliasFor, +}: { + type: FieldDefinitionConfig['type']; + aliasFor?: string; +}) => { + if (aliasFor) { + return i18n.translate('xpack.streams.fieldType.aliasFor', { + defaultMessage: 'Alias for {aliasFor}', + values: { aliasFor }, + }); + } return ( { const { useFieldsMetadata } = useKibana().dependencies.start.fieldsMetadata; + const ecsFieldName = getRegularEcsField(field.name); + const { fieldsMetadata, loading } = useFieldsMetadata( - { attributes: ['type'], fieldNames: [field.name] }, + { attributes: ['type'], fieldNames: [ecsFieldName] }, [field] ); // Propagate recommendation to state if a type is not already set - const recommendation = fieldsMetadata?.[field.name]?.type; + const recommendation = fieldsMetadata?.[ecsFieldName]?.type; useEffect(() => { if ( diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts index 122f0051f910a..11fc1a092aa58 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/hooks/use_schema_fields.ts @@ -62,6 +62,7 @@ export const useSchemaFields = ({ format: 'format' in field ? field.format : undefined, additionalParameters: getAdvancedParameters(name, field), parent: field.from, + alias_for: field.alias_for, status: 'inherited', }) ); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx index 2e3066a080be2..e31f0c51fe5d7 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/schema_editor_table.tsx @@ -101,7 +101,7 @@ const createCellRenderer = if (columnId === 'type') { if (!field.type) return EMPTY_CONTENT; - return ; + return ; } if (columnId === 'parent') { diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/types.ts b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/types.ts index 2b849515ece0c..30a45239537ca 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/components/data_management/schema_editor/types.ts @@ -18,6 +18,7 @@ export type SchemaFieldType = FieldDefinitionConfig['type']; export interface BaseSchemaField extends Omit { name: string; parent: string; + alias_for?: string; format?: string; } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts index 6d38aefaee934..f3812befc7852 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts @@ -36,7 +36,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs.nginx', }, if: { - field: 'host.name', + field: 'resource.attributes.host.name', operator: 'eq' as const, value: 'routeme', }, @@ -60,19 +60,19 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [ { grok: { - field: 'message', + field: 'body.text', patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + '%{TIMESTAMP_ISO8601:attributes.inner_timestamp} %{LOGLEVEL:severity_text} %{GREEDYDATA:attributes.message2}', ], if: { always: {} }, }, }, { dissect: { - field: 'message2', - pattern: '%{log.logger} %{message3}', + field: 'attributes.message2', + pattern: '%{attributes.log.logger} %{attributes.message3}', if: { - field: 'log.level', + field: 'severity_text', operator: 'eq', value: 'info', }, @@ -82,21 +82,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { wired: { routing: [], fields: { - '@timestamp': { - type: 'date', - }, - message: { - type: 'match_only_text', - }, - message2: { + 'attributes.message2': { type: 'match_only_text', }, - 'host.name': { - type: 'keyword', - }, - 'log.level': { - type: 'keyword', - }, }, }, }, @@ -118,12 +106,20 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await fetchDocument(esClient, 'logs.nginx', response._id); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:10.000Z', - message: '2023-01-01T00:00:10.000Z error test', - 'host.name': 'routeme', - inner_timestamp: '2023-01-01T00:00:10.000Z', - message2: 'test', - 'log.level': 'error', - 'stream.name': 'logs.nginx', + body: { + text: '2023-01-01T00:00:10.000Z error test', + }, + resource: { + attributes: { + 'host.name': 'routeme', + }, + }, + attributes: { + inner_timestamp: '2023-01-01T00:00:10.000Z', + message2: 'test', + }, + severity_text: 'error', + stream: { name: 'logs.nginx' }, }); }); @@ -139,14 +135,22 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await fetchDocument(esClient, 'logs.nginx', response._id); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:11.000Z', - message: '2023-01-01T00:00:10.000Z info mylogger this is the message', - inner_timestamp: '2023-01-01T00:00:10.000Z', - 'host.name': 'routeme', - 'log.level': 'info', - 'log.logger': 'mylogger', - message2: 'mylogger this is the message', - message3: 'this is the message', - 'stream.name': 'logs.nginx', + body: { + text: '2023-01-01T00:00:10.000Z info mylogger this is the message', + }, + resource: { + attributes: { + 'host.name': 'routeme', + }, + }, + attributes: { + inner_timestamp: '2023-01-01T00:00:10.000Z', + 'log.logger': 'mylogger', + message2: 'mylogger this is the message', + message3: 'this is the message', + }, + severity_text: 'info', + stream: { name: 'logs.nginx' }, }); }); @@ -155,7 +159,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { index: 'logs.nginx', query: { match: { - message2: 'mylogger', + 'attributes.message2': 'mylogger', }, }, }); @@ -167,7 +171,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { index: 'logs.nginx', query: { match: { - 'log.logger': 'mylogger', + 'attributes.log.logger': 'mylogger', }, }, }); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts index a36bfcc8b4c5d..488f9ef519627 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/flush_config.ts @@ -77,7 +77,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { (logsDeeplyNestedStreamname.body as Streams.WiredStream.GetResponse).stream.ingest.wired .fields ).to.eql({ - field2: { + 'attributes.field2': { type: 'keyword', }, }); @@ -87,7 +87,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const logsResponse = await esClient.search({ index: 'logs', query: { - match: { 'log.level': 'info' }, + match: { severity_text: 'info' }, }, }); @@ -96,7 +96,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const logsTestResponse = await esClient.search({ index: 'logs.test', query: { - match: { numberfield: 20 }, + match: { 'attributes.numberfield': 20 }, }, }); @@ -105,7 +105,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const logsTest2Response = await esClient.search({ index: 'logs.test2', query: { - match: { field2: 'abc' }, + match: { 'attributes.field2': 'abc' }, }, }); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts index 2352e435c87ec..2e452371f951b 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts @@ -140,10 +140,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(result._index).to.match(/^\.ds\-logs-.*/); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:00.000Z', - message: 'test', - 'log.level': 'info', - 'log.logger': 'nginx', - 'stream.name': 'logs', + body: { + text: 'test', + }, + severity_text: 'info', + attributes: { 'log.logger': 'nginx' }, + stream: { name: 'logs' }, }); }); @@ -160,11 +162,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await indexAndAssertTargetStream(esClient, 'logs', doc); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:00.000Z', - message: 'test', - 'log.level': 'info', - 'log.logger': 'nginx', - 'stream.name': 'logs', - stream: 'somethingelse', + body: { text: 'test' }, + severity_text: 'info', + attributes: { + 'log.logger': 'nginx', + stream: 'somethingelse', + }, + stream: { name: 'logs' }, }); }); @@ -174,7 +178,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs.nginx', }, if: { - field: 'log.logger', + field: 'attributes.log.logger', operator: 'eq' as const, value: 'nginx', }, @@ -210,10 +214,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await indexAndAssertTargetStream(esClient, 'logs.nginx', doc); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:10.000Z', - message: 'test', - 'log.level': 'info', - 'log.logger': 'nginx', - 'stream.name': 'logs.nginx', + body: { text: 'test' }, + severity_text: 'info', + attributes: { + 'log.logger': 'nginx', + }, + stream: { name: 'logs.nginx' }, }); }); @@ -222,7 +228,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { name: 'logs.nginx.access', }, - if: { field: 'log.level', operator: 'eq' as const, value: 'info' }, + if: { field: 'severity_text', operator: 'eq' as const, value: 'info' }, }; const response = await forkStream(apiClient, 'logs.nginx', body); expect(response).to.have.property('acknowledged', true); @@ -240,10 +246,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await indexAndAssertTargetStream(esClient, 'logs.nginx.access', doc); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:20.000Z', - message: 'test', - 'log.level': 'info', - 'log.logger': 'nginx', - 'stream.name': 'logs.nginx.access', + body: { text: 'test' }, + severity_text: 'info', + attributes: { + 'log.logger': 'nginx', + }, + stream: { name: 'logs.nginx.access' }, }); }); @@ -252,7 +260,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { name: 'logs.nginx.error', }, - if: { field: 'log', operator: 'eq' as const, value: 'error' }, + if: { field: 'attributes.log', operator: 'eq' as const, value: 'error' }, }; const response = await forkStream(apiClient, 'logs.nginx', body); expect(response).to.have.property('acknowledged', true); @@ -270,10 +278,12 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const result = await indexAndAssertTargetStream(esClient, 'logs.nginx', doc); expect(result._source).to.eql({ '@timestamp': '2024-01-01T00:00:20.000Z', - message: 'test', - 'log.level': 'error', - 'log.logger': 'nginx', - 'stream.name': 'logs.nginx', + body: { text: 'test' }, + severity_text: 'error', + attributes: { + 'log.logger': 'nginx', + }, + stream: { name: 'logs.nginx' }, }); }); @@ -282,7 +292,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { stream: { name: 'logs.number-test', }, - if: { field: 'code', operator: 'gte' as const, value: '500' }, + if: { field: 'attributes.code', operator: 'gte' as const, value: '500' }, }; const response = await forkStream(apiClient, 'logs', body); expect(response).to.have.property('acknowledged', true); @@ -314,8 +324,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, if: { or: [ - { field: 'message', operator: 'contains' as const, value: '500' }, - { field: 'message', operator: 'contains' as const, value: 400 }, + { field: 'body.text', operator: 'contains' as const, value: '500' }, + { field: 'body.text', operator: 'contains' as const, value: 400 }, ], }, }; @@ -347,7 +357,11 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, if: { or: [ - { field: '@abc.weird fieldname', operator: 'contains' as const, value: 'route_it' }, + { + field: 'attributes.@abc.weird fieldname', + operator: 'contains' as const, + value: 'route_it', + }, ], }, }; @@ -383,7 +397,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [], wired: { fields: { - myfield: { + 'attributes.myfield': { type: 'boolean', }, }, @@ -405,7 +419,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { wired: { ...body.stream.ingest.wired, fields: { - myfield: { + 'attributes.myfield': { type: 'keyword', }, }, @@ -428,7 +442,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [], wired: { fields: { - myfield: { + 'attributes.myfield': { type: 'system', }, }, @@ -477,7 +491,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { destination: 'logs.nginx.error', if: { - field: 'log', + field: 'attributes.log', operator: 'eq', value: 'error', }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts index 27496bf03d300..b995397ba6707 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts @@ -26,13 +26,49 @@ const streams: StreamPutItem[] = [ '@timestamp': { type: 'date', }, - message: { - type: 'match_only_text', + 'scope.dropped_attributes_count': { + type: 'long', + }, + dropped_attributes_count: { + type: 'long', + }, + 'resource.dropped_attributes_count': { + type: 'long', + }, + 'resource.schema_url': { + type: 'keyword', + }, + 'scope.name': { + type: 'keyword', + }, + 'scope.schema_url': { + type: 'keyword', + }, + 'scope.version': { + type: 'keyword', + }, + observed_timestamp: { + type: 'date', + }, + trace_id: { + type: 'keyword', + }, + span_id: { + type: 'keyword', }, - 'host.name': { + event_name: { type: 'keyword', }, - 'log.level': { + severity_text: { + type: 'keyword', + }, + 'body.text': { + type: 'match_only_text', + }, + severity_number: { + type: 'long', + }, + 'resource.attributes.host.name': { type: 'keyword', }, 'stream.name': { @@ -45,7 +81,7 @@ const streams: StreamPutItem[] = [ if: { and: [ { - field: 'numberfield', + field: 'attributes.numberfield', operator: 'gt', value: 15, }, @@ -57,7 +93,7 @@ const streams: StreamPutItem[] = [ if: { and: [ { - field: 'field2', + field: 'attributes.field2', operator: 'eq', value: 'abc', }, @@ -79,7 +115,7 @@ const streams: StreamPutItem[] = [ wired: { routing: [], fields: { - numberfield: { + 'attributes.numberfield': { type: 'long', }, }, @@ -96,15 +132,15 @@ const streams: StreamPutItem[] = [ processing: [ { grok: { - field: 'message', - patterns: ['%{NUMBER:numberfield}'], + field: 'body.text', + patterns: ['%{NUMBER:attributes.numberfield}'], if: { always: {} }, }, }, ], wired: { fields: { - field2: { + 'attributes.field2': { type: 'keyword', }, }, @@ -122,7 +158,7 @@ const streams: StreamPutItem[] = [ processing: [], wired: { fields: { - field2: { + 'attributes.field2': { type: 'keyword', }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts index 82c2b5b65bdee..b62a1ead0f2de 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/processing_simulate.ts @@ -49,16 +49,17 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const testDoc = { '@timestamp': TEST_TIMESTAMP, - message: TEST_MESSAGE, - 'host.name': TEST_HOST, - 'log.level': 'error', + 'body.text': TEST_MESSAGE, + 'resource.attributes.host.name': TEST_HOST, + severity_text: 'error', }; const basicDissectProcessor = { id: 'dissect-uuid', dissect: { - field: 'message', - pattern: '%{parsed_timestamp} %{parsed_level} %{parsed_message}', + field: 'body.text', + pattern: + '%{attributes.parsed_timestamp} %{attributes.parsed_level} %{attributes.parsed_message}', if: { always: {} }, }, }; @@ -66,9 +67,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const basicGrokProcessor = { id: 'draft', grok: { - field: 'message', + field: 'body.text', patterns: [ - '%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:parsed_level} %{GREEDYDATA:parsed_message}', + '%{TIMESTAMP_ISO8601:attributes.parsed_timestamp} %{LOGLEVEL:attributes.parsed_level} %{GREEDYDATA:attributes.parsed_message}', ], if: { always: {} }, }, @@ -76,7 +77,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const createTestDocument = (message = TEST_MESSAGE) => ({ '@timestamp': TEST_TIMESTAMP, - message, + 'body.text': message, }); before(async () => { @@ -93,7 +94,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs.test', }, if: { - field: 'host.name', + field: 'resource.attributes.host.name', operator: 'eq' as const, value: TEST_HOST, }, @@ -118,13 +119,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(status).to.be('parsed'); expect(errors).to.eql([]); expect(detected_fields).to.eql([ - { processor_id: 'draft', name: 'parsed_level' }, - { processor_id: 'draft', name: 'parsed_message' }, - { processor_id: 'draft', name: 'parsed_timestamp' }, + { processor_id: 'draft', name: 'attributes.parsed_level' }, + { processor_id: 'draft', name: 'attributes.parsed_message' }, + { processor_id: 'draft', name: 'attributes.parsed_timestamp' }, ]); - expect(value).to.have.property('parsed_level', 'error'); - expect(value).to.have.property('parsed_message', 'test'); - expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP); + expect(value).to.have.property('attributes.parsed_level', 'error'); + expect(value).to.have.property('attributes.parsed_message', 'test'); + expect(value).to.have.property('attributes.parsed_timestamp', TEST_TIMESTAMP); }); it('should simulate with detected fields', async () => { @@ -132,8 +133,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [basicGrokProcessor], documents: [createTestDocument()], detected_fields: [ - { name: 'parsed_timestamp', type: 'date' }, - { name: 'parsed_level', type: 'keyword' }, + { name: 'attributes.parsed_timestamp', type: 'date' }, + { name: 'attributes.parsed_level', type: 'keyword' }, ], }); @@ -141,8 +142,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { response.body.detected_fields.find((f: { name: string }) => f.name === name); expect(response.body.detected_fields).to.have.length(3); // Including parsed_message - expect(findField('parsed_timestamp')).to.have.property('type', 'date'); - expect(findField('parsed_level')).to.have.property('type', 'keyword'); + expect(findField('attributes.parsed_timestamp')).to.have.property('type', 'date'); + expect(findField('attributes.parsed_level')).to.have.property('type', 'keyword'); }); it('should simulate multiple sequential processors', async () => { @@ -152,8 +153,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{IP:parsed_ip}'], + field: 'attributes.parsed_message', + patterns: ['%{IP:attributes.parsed_ip}'], if: { always: {} }, }, }, @@ -167,15 +168,15 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const { detected_fields, status, value } = response.body.documents[0]; expect(status).to.be('parsed'); expect(detected_fields).to.eql([ - { processor_id: 'dissect-uuid', name: 'parsed_level' }, - { processor_id: 'dissect-uuid', name: 'parsed_message' }, - { processor_id: 'dissect-uuid', name: 'parsed_timestamp' }, - { processor_id: 'draft', name: 'parsed_ip' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_level' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_message' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_timestamp' }, + { processor_id: 'draft', name: 'attributes.parsed_ip' }, ]); - expect(value).to.have.property('parsed_level', 'error'); - expect(value).to.have.property('parsed_message', 'test 127.0.0.1'); - expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP); - expect(value).to.have.property('parsed_ip', '127.0.0.1'); + expect(value).to.have.property('attributes.parsed_level', 'error'); + expect(value).to.have.property('attributes.parsed_message', 'test 127.0.0.1'); + expect(value).to.have.property('attributes.parsed_timestamp', TEST_TIMESTAMP); + expect(value).to.have.property('attributes.parsed_ip', '127.0.0.1'); }); it('should simulate partially parsed documents', async () => { @@ -185,8 +186,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message + field: 'attributes.parsed_message', + patterns: ['%{TIMESTAMP_ISO8601:attributes.other_date}'], // This processor will fail, as won't match another date from the remaining message if: { always: {} }, }, }, @@ -201,13 +202,13 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const { detected_fields, status, value } = response.body.documents[0]; expect(status).to.be('partially_parsed'); expect(detected_fields).to.eql([ - { processor_id: 'dissect-uuid', name: 'parsed_level' }, - { processor_id: 'dissect-uuid', name: 'parsed_message' }, - { processor_id: 'dissect-uuid', name: 'parsed_timestamp' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_level' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_message' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_timestamp' }, ]); - expect(value).to.have.property('parsed_level', 'error'); - expect(value).to.have.property('parsed_message', 'test 127.0.0.1'); - expect(value).to.have.property('parsed_timestamp', TEST_TIMESTAMP); + expect(value).to.have.property('attributes.parsed_level', 'error'); + expect(value).to.have.property('attributes.parsed_message', 'test 127.0.0.1'); + expect(value).to.have.property('attributes.parsed_timestamp', TEST_TIMESTAMP); }); it('should return processor metrics', async () => { @@ -217,8 +218,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message + field: 'attributes.parsed_message', + patterns: ['%{TIMESTAMP_ISO8601:attributes.other_date}'], // This processor will fail, as won't match another date from the remaining message if: { always: {} }, }, }, @@ -231,9 +232,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const grokMetrics = processorsMetrics.draft; expect(dissectMetrics.detected_fields).to.eql([ - 'parsed_level', - 'parsed_message', - 'parsed_timestamp', + 'attributes.parsed_level', + 'attributes.parsed_message', + 'attributes.parsed_timestamp', ]); expect(dissectMetrics.errors).to.eql([]); expect(dissectMetrics.failed_rate).to.be(0); @@ -259,8 +260,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{IP:parsed_ip}'], + field: 'attributes.parsed_message', + patterns: ['%{IP:attributes.parsed_ip}'], if: { always: {} }, }, }, @@ -299,7 +300,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ...basicDissectProcessor, dissect: { ...basicDissectProcessor.dissect, - if: { field: 'message', operator: 'contains', value: 'test' }, + if: { field: 'body.text', operator: 'contains', value: 'test' }, }, }, ], @@ -333,8 +334,10 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{WORD:ignored_field} %{IP:parsed_ip} %{GREEDYDATA:parsed_message}'], // Try overriding parsed_message previously computed by dissect + field: 'attributes.parsed_message', + patterns: [ + '%{WORD:attributes.ignored_field} %{IP:attributes.parsed_ip} %{GREEDYDATA:attributes.parsed_message}', + ], // Try overriding parsed_message previously computed by dissect if: { always: {} }, }, }, @@ -348,14 +351,14 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const { detected_fields, status, value } = response.body.documents[0]; expect(status).to.be('parsed'); expect(detected_fields).to.eql([ - { processor_id: 'dissect-uuid', name: 'parsed_level' }, - { processor_id: 'dissect-uuid', name: 'parsed_message' }, - { processor_id: 'dissect-uuid', name: 'parsed_timestamp' }, - { processor_id: 'draft', name: 'ignored_field' }, - { processor_id: 'draft', name: 'parsed_ip' }, - { processor_id: 'draft', name: 'parsed_message' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_level' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_message' }, + { processor_id: 'dissect-uuid', name: 'attributes.parsed_timestamp' }, + { processor_id: 'draft', name: 'attributes.ignored_field' }, + { processor_id: 'draft', name: 'attributes.parsed_ip' }, + { processor_id: 'draft', name: 'attributes.parsed_message' }, ]); - expect(value).to.have.property('parsed_message', 'greedy data message'); + expect(value).to.have.property('attributes.parsed_message', 'greedy data message'); }); it('should gracefully return the errors for each partially parsed or failed document', async () => { @@ -365,8 +368,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'parsed_message', - patterns: ['%{TIMESTAMP_ISO8601:other_date}'], // This processor will fail, as won't match another date from the remaining message + field: 'attributes.parsed_message', + patterns: ['%{TIMESTAMP_ISO8601:attributes.other_date}'], // This processor will fail, as won't match another date from the remaining message if: { always: {} }, }, }, @@ -391,7 +394,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'message', + field: 'body.text', patterns: ['%{INVALID_PATTERN:field}'], if: { always: {} }, }, @@ -413,22 +416,49 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { ]); }); + it('should gracefully return errors related to non-namespaced fields', async () => { + const response = await simulateProcessingForStream(apiClient, 'logs.test', { + processing: [ + { + id: 'draft', + grok: { + field: 'body.text', + patterns: ['%{WORD:abc}'], + if: { always: {} }, + }, + }, + ], + documents: [createTestDocument('test message')], + }); + + const processorsMetrics = response.body.processors_metrics; + const grokMetrics = processorsMetrics.draft; + + expect(grokMetrics.errors).to.eql([ + { + processor_id: 'draft', + type: 'non_namespaced_fields_failure', + message: 'The fields generated by the processor are not namespaced ECS fields: [abc]', + }, + ]); + }); + it('should gracefully return non-additive simulation errors', async () => { const response = await simulateProcessingForStream(apiClient, 'logs.test', { processing: [ { id: 'draft', grok: { - field: 'message', + field: 'body.text', patterns: [ // This overwrite the exising log.level and message values - '%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message}', + '%{TIMESTAMP_ISO8601:attributes.parsed_timestamp} %{LOGLEVEL:severity_text} %{GREEDYDATA:body.text}', ], if: { always: {} }, }, }, ], - documents: [{ ...createTestDocument(), 'log.level': 'info' }], + documents: [{ ...createTestDocument(), severity_text: 'info' }], }); const processorsMetrics = response.body.processors_metrics; @@ -439,7 +469,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processor_id: 'draft', type: 'non_additive_processor_failure', message: - 'The processor is not additive to the documents. It might update fields [log.level,message]', + 'The processor is not additive to the documents. It might update fields [body.text,severity_text]', }, ]); // Non-additive changes are not counted as error @@ -453,7 +483,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'message', + field: 'body.text', patterns: ['%{TIMESTAMP_ISO8601:@timestamp}'], if: { always: {} }, }, @@ -485,7 +515,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [basicGrokProcessor], documents: [createTestDocument()], detected_fields: [ - { name: 'parsed_timestamp', type: 'boolean' }, // Incompatible type + { name: 'attributes.parsed_timestamp', type: 'boolean' }, // Incompatible type ], } ); @@ -493,7 +523,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { expect(detectedFieldsFailureResponse.body.documents[0].errors).to.eql([ { type: 'field_mapping_failure', - message: `Some field types might not be compatible with this document: [1:44] failed to parse field [parsed_timestamp] of type [boolean] in document with id '0'. Preview of field's value: '${TEST_TIMESTAMP}'`, + message: `Some field types might not be compatible with this document: [1:98] failed to parse field [attributes.parsed_timestamp] of type [boolean] in document with id '0'. Preview of field's value: '${TEST_TIMESTAMP}'`, }, ]); expect(detectedFieldsFailureResponse.body.documents[0].status).to.be('failed'); @@ -510,16 +540,16 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { { id: 'draft', grok: { - field: 'message', + field: 'body.text', patterns: [ // This overwrite the exising log.level and message values - '%{TIMESTAMP_ISO8601:parsed_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message}', + '%{TIMESTAMP_ISO8601:attributes.parsed_timestamp} %{LOGLEVEL:severity_text} %{GREEDYDATA:attributes.message}', ], if: { always: {} }, }, }, ], - documents: [{ ...createTestDocument(), 'log.level': 'info' }], + documents: [{ ...createTestDocument(), severity_text: 'info' }], }), ]); diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts index 2440e00433d53..a091b05d05bfc 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts @@ -28,7 +28,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { wired: { routing: [], fields: { - numberfield: { + 'attributes.numberfield': { type: 'long', }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts index 44e2f2bbc603c..a718e307f0656 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts @@ -27,13 +27,49 @@ const rootStreamDefinition: Streams.WiredStream.Definition = { '@timestamp': { type: 'date', }, - message: { - type: 'match_only_text', + 'scope.dropped_attributes_count': { + type: 'long', + }, + dropped_attributes_count: { + type: 'long', + }, + 'resource.dropped_attributes_count': { + type: 'long', + }, + 'resource.schema_url': { + type: 'keyword', + }, + 'scope.name': { + type: 'keyword', + }, + 'scope.schema_url': { + type: 'keyword', + }, + 'scope.version': { + type: 'keyword', }, - 'host.name': { + observed_timestamp: { + type: 'date', + }, + trace_id: { + type: 'keyword', + }, + span_id: { + type: 'keyword', + }, + event_name: { type: 'keyword', }, - 'log.level': { + severity_text: { + type: 'keyword', + }, + 'body.text': { + type: 'match_only_text', + }, + severity_number: { + type: 'long', + }, + 'resource.attributes.host.name': { type: 'keyword', }, 'stream.name': { @@ -70,9 +106,9 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { processing: [ { grok: { - field: 'message', + field: 'body.text', patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + '%{TIMESTAMP_ISO8601:attributes.inner_timestamp} %{LOGLEVEL:severity_text} %{GREEDYDATA:attributes.message2}', ], if: { always: {} }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/schema.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/schema.ts index 2054ebc7ee4fd..2f6e270faf08d 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/schema.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/schema.ts @@ -51,7 +51,11 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }) .expect(200); - expect(response.body.unmappedFields).to.eql(['another.field', 'lastField', 'some.field']); + expect(response.body.unmappedFields).to.eql([ + 'attributes.another.field', + 'attributes.lastField', + 'attributes.some.field', + ]); }); }); @@ -65,7 +69,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs', }, body: { - field_definitions: [{ name: 'message', type: 'boolean' }], + field_definitions: [{ name: 'body.text', type: 'boolean' }], }, }, } @@ -84,7 +88,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs', }, body: { - field_definitions: [{ name: 'message', type: 'keyword' }], + field_definitions: [{ name: 'body.text', type: 'keyword' }], }, }, } @@ -100,7 +104,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs.nginx', }, if: { - field: 'log.logger', + field: 'attributes.log.logger', operator: 'eq' as const, value: 'nginx', }, @@ -115,7 +119,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: 'logs.nginx', }, body: { - field_definitions: [{ name: 'message', type: 'keyword' }], + field_definitions: [{ name: 'body.text', type: 'keyword' }], }, }, } diff --git a/x-pack/test_serverless/functional/test_suites/observability/streams/read_privilege.ts b/x-pack/test_serverless/functional/test_suites/observability/streams/read_privilege.ts index 63910f868498e..f0bead574bd69 100644 --- a/x-pack/test_serverless/functional/test_suites/observability/streams/read_privilege.ts +++ b/x-pack/test_serverless/functional/test_suites/observability/streams/read_privilege.ts @@ -31,7 +31,7 @@ const request: Streams.WiredStream.UpsertRequest = { wired: { routing: [], fields: { - numberfield: { + 'attributes.numberfield': { type: 'long', }, },