diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts new file mode 100644 index 0000000000000..de4b995f5371c --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts @@ -0,0 +1,547 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Streams } from '@kbn/streams-schema'; +import { ClassicStream } from './classic_stream'; +import type { StateDependencies, StreamChange } from '../types'; +import type { State } from '../state'; +import type { StreamChangeStatus } from '../stream_active_record/stream_active_record'; + +interface ClassicStreamChanges { + processing: boolean; + field_overrides: boolean; + failure_store: boolean; + lifecycle: boolean; + settings: boolean; + query_streams: boolean; +} + +interface ClassicStreamTestable { + _changes: ClassicStreamChanges; + doHandleUpsertChange( + definition: Streams.all.Definition, + desiredState: State, + startingState: State + ): Promise<{ cascadingChanges: StreamChange[]; changeStatus: StreamChangeStatus }>; +} + +describe('ClassicStream', () => { + const createMockDependencies = (): StateDependencies => + ({ + logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + isServerless: false, + isDev: false, + scopedClusterClient: { + asCurrentUser: { + indices: { + getDataStreamSettings: jest.fn().mockResolvedValue({ + data_streams: [ + { + name: 'logs-test-default', + effective_settings: { + index: {}, + }, + }, + ], + }), + }, + }, + }, + } as unknown as StateDependencies); + + const createMockState = ( + streams: Map = new Map() + ): State => + ({ + get: (name: string) => streams.get(name), + has: (name: string) => streams.has(name), + all: () => Array.from(streams.values()), + } as unknown as State); + + const createBaseClassicStreamDefinition = ( + overrides: Partial = {} + ): Streams.ClassicStream.Definition => ({ + name: 'logs-test-default', + description: 'Test stream', + updated_at: new Date().toISOString(), + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { + field_overrides: undefined, + }, + failure_store: { inherit: {} }, + }, + ...overrides, + }); + + describe('doHandleUpsertChange - _changes flags for new streams', () => { + it('sets processing to false when processing.steps is empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); + }); + + it('sets processing to true when processing.steps is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(true); + }); + + it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets lifecycle to true when using non-inherit lifecycle for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets settings to false when settings is empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(false); + }); + + it('sets settings to true when settings is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: { 'index.refresh_interval': { value: '5s' } }, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(true); + }); + + it('sets field_overrides to false when field_overrides is undefined for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); + }); + + it('sets field_overrides to false when field_overrides is empty object for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: {} }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); + }); + + it('sets field_overrides to true when field_overrides is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { test_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(true); + }); + + it('sets failure_store to false when using inherit failure_store for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(false); + }); + + it('sets failure_store to true when using non-inherit failure_store for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { lifecycle: { enabled: { data_retention: '7d' } } }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(true); + }); + + it('sets all _changes to false when creating stream with all empty/default values', async () => { + const definition = createBaseClassicStreamDefinition(); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); + expect(stream.hasChangedLifecycle()).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(false); + }); + }); + + describe('doHandleUpsertChange - _changes flags for existing streams', () => { + it('sets processing to true when processing changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(true); + }); + + it('sets processing to false when processing unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: '2024-01-01T00:00:00.000Z', + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); + }); + + it('sets lifecycle to true when lifecycle changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets lifecycle to false when lifecycle unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets field_overrides to true when field_overrides changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { old_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { new_field: { type: 'match_only_text' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(true); + }); + + it('sets field_overrides to false when field_overrides unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { test_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); + + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts index 435ac368cc8c4..1ed4c0383e7c2 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts @@ -39,7 +39,12 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import type { StateDependencies, StreamChange } from '../types'; -import { formatSettings, settingsUpdateRequiresRollover, validateQueryStreams } from './helpers'; +import { + computeChange, + formatSettings, + settingsUpdateRequiresRollover, + validateQueryStreams, +} from './helpers'; import { validateSettings, validateSettingsWithDryRun } from './validate_settings'; interface ClassicStreamChanges extends StreamChanges { @@ -95,34 +100,58 @@ export class ClassicStream extends StreamActiveRecord 0, + hasChanged: () => + !_.isEqual( + _.omit(this._definition.ingest.processing, ['updated_at']), + _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) + ), + }); - this._changes.settings = - !startingStateStreamDefinition || - !_.isEqual(await this.getEffectiveSettings(), this._definition.ingest.settings); + this._changes.lifecycle = computeChange({ + isExistingStream, + hasMeaningfulValue: !isInheritLifecycle(this._definition.ingest.lifecycle), + hasChanged: () => + !_.isEqual( + this._definition.ingest.lifecycle, + startingStateStreamDefinition!.ingest.lifecycle + ), + }); - this._changes.field_overrides = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.classic.field_overrides, - startingStateStreamDefinition.ingest.classic.field_overrides - ); + // Prefetch effective settings for existing streams to allow sync comparison + const effectiveSettings = isExistingStream ? await this.getEffectiveSettings() : undefined; + this._changes.settings = computeChange({ + isExistingStream, + hasMeaningfulValue: Object.keys(this._definition.ingest.settings || {}).length > 0, + hasChanged: () => !_.isEqual(effectiveSettings, this._definition.ingest.settings), + }); - this._changes.failure_store = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.failure_store, - startingStateStreamDefinition.ingest.failure_store - ); + this._changes.field_overrides = computeChange({ + isExistingStream, + hasMeaningfulValue: !!( + this._definition.ingest.classic.field_overrides && + Object.keys(this._definition.ingest.classic.field_overrides).length > 0 + ), + hasChanged: () => + !_.isEqual( + this._definition.ingest.classic.field_overrides, + startingStateStreamDefinition!.ingest.classic.field_overrides + ), + }); + + this._changes.failure_store = computeChange({ + isExistingStream, + hasMeaningfulValue: !isInheritFailureStore(this._definition.ingest.failure_store), + hasChanged: () => + !_.isEqual( + this._definition.ingest.failure_store, + startingStateStreamDefinition!.ingest.failure_store + ), + }); this._changes.query_streams = !startingStateStreamDefinition || diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts index 20504a4d48684..7e3741d2bf2b6 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts @@ -10,6 +10,29 @@ import type { BaseStream } from '@kbn/streams-schema/src/models/base'; import type { State } from '../state'; import type { ValidationResult } from '../stream_active_record/stream_active_record'; +interface ComputeChangeOptions { + /** Whether the stream already exists in the starting state */ + isExistingStream: boolean; + /** Whether the new value is meaningful (non-empty/non-default) */ + hasMeaningfulValue: boolean; + /** Whether the value changed compared to the starting state (only evaluated for existing streams) */ + hasChanged: () => boolean; +} + +/** + * Determines if a change flag should be set for a stream property. + * + * For existing streams (isExistingStream=true): returns true if the values are not equal (hasChanged) + * For new streams (isExistingStream=false): returns true if the value is meaningful/non-empty (hasMeaningfulValue) + */ +export function computeChange({ + isExistingStream, + hasMeaningfulValue, + hasChanged, +}: ComputeChangeOptions): boolean { + return isExistingStream ? hasChanged() : hasMeaningfulValue; +} + export function formatSettings(settings: IngestStreamSettings, isServerless: boolean) { if (isServerless) { return { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts new file mode 100644 index 0000000000000..aa765a9f9f964 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts @@ -0,0 +1,495 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Streams } from '@kbn/streams-schema'; +import { WiredStream } from './wired_stream'; +import type { StateDependencies, StreamChange } from '../types'; +import type { State } from '../state'; +import type { StreamChangeStatus } from '../stream_active_record/stream_active_record'; + +interface WiredStreamChanges { + ownFields: boolean; + ownRouting: boolean; + routing: boolean; + processing: boolean; + lifecycle: boolean; + settings: boolean; + failure_store: boolean; +} + +interface WiredStreamTestable { + _changes: WiredStreamChanges; + doHandleUpsertChange( + definition: Streams.all.Definition, + desiredState: State, + startingState: State + ): Promise<{ cascadingChanges: StreamChange[]; changeStatus: StreamChangeStatus }>; +} + +describe('WiredStream', () => { + const createMockDependencies = (): StateDependencies => + ({ + logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + isServerless: false, + isDev: false, + } as unknown as StateDependencies); + + const createMockState = ( + streams: Map = new Map() + ): State => + ({ + get: (name: string) => streams.get(name), + has: (name: string) => streams.has(name), + all: () => Array.from(streams.values()), + } as unknown as State); + + const createBaseWiredStreamDefinition = ( + overrides: Partial = {} + ): Streams.WiredStream.Definition => ({ + name: 'logs.test', + description: 'Test stream', + updated_at: new Date().toISOString(), + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: {}, + routing: [], + }, + failure_store: { inherit: {} }, + }, + ...overrides, + }); + + describe('doHandleUpsertChange - _changes flags for new streams', () => { + it('sets ownFields to false when fields is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedFields()).toBe(false); + }); + + it('sets ownFields to true when fields is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: { 'test.field': { type: 'keyword' } }, + routing: [], + }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedFields()).toBe(true); + }); + + it('sets routing to false when routing is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(false); + }); + + it('sets routing to true when routing is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: {}, + routing: [{ destination: 'logs.test.child', where: { never: {} }, status: 'enabled' }], + }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(true); + }); + + it('sets processing to false when processing.steps is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(false); + }); + + it('sets processing to true when processing.steps is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(true); + }); + + it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets lifecycle to true when using non-inherit lifecycle for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets settings to false when settings is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedSettings()).toBe(false); + }); + + it('sets settings to true when settings is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: { 'index.refresh_interval': { value: '5s' } }, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedSettings()).toBe(true); + }); + + it('sets failure_store to false when using inherit failure_store for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedFailureStore()).toBe(false); + }); + + it('sets failure_store to true when using non-inherit failure_store for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { lifecycle: { enabled: { data_retention: '7d' } } }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedFailureStore()).toBe(true); + }); + + it('sets all _changes to false when creating stream with all empty/default values', async () => { + const definition = createBaseWiredStreamDefinition(); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); + + expect(stream.hasChangedFields()).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(false); + expect(stream.hasChangedLifecycle()).toBe(false); + expect(stream.hasChangedSettings()).toBe(false); + expect(stream.hasChangedFailureStore()).toBe(false); + }); + }); + + describe('doHandleUpsertChange - _changes flags for existing streams', () => { + it('sets ownFields to true when fields changed for existing stream', async () => { + const existingDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'old.field': { type: 'keyword' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'new.field': { type: 'match_only_text' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs.test', { definition: existingDefinition }]]) + ); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); + + expect(stream.hasChangedFields()).toBe(true); + }); + + it('sets ownFields to false when fields unchanged for existing stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'test.field': { type: 'keyword' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs.test', { definition }]])); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); + + expect(stream.hasChangedFields()).toBe(false); + }); + + it('sets lifecycle to true when lifecycle changed for existing stream', async () => { + const existingDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs.test', { definition: existingDefinition }]]) + ); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets lifecycle to false when lifecycle unchanged for existing stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs.test', { definition }]])); + + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index c12a5a4a3ef5a..b1f691dda2e96 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -63,7 +63,12 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { hasSupportedStreamsRoot } from '../../root_stream_definition'; -import { formatSettings, settingsUpdateRequiresRollover, validateQueryStreams } from './helpers'; +import { + computeChange, + formatSettings, + settingsUpdateRequiresRollover, + validateQueryStreams, +} from './helpers'; import { validateSettings, validateSettingsWithDryRun } from './validate_settings'; interface WiredStreamChanges extends StreamChanges { @@ -128,41 +133,69 @@ export class WiredStream extends StreamActiveRecord 0, + hasChanged: () => + !_.isEqual( + this._definition.ingest.wired.fields, + startingStateStreamDefinition!.ingest.wired.fields + ), + }); - this._changes.routing = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.wired.routing, - startingStateStreamDefinition.ingest.wired.routing - ); + this._changes.routing = computeChange({ + isExistingStream, + hasMeaningfulValue: (this._definition.ingest.wired.routing || []).length > 0, + hasChanged: () => + !_.isEqual( + this._definition.ingest.wired.routing, + startingStateStreamDefinition!.ingest.wired.routing + ), + }); - this._changes.failure_store = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.failure_store, - startingStateStreamDefinition.ingest.failure_store - ); + this._changes.failure_store = computeChange({ + isExistingStream, + hasMeaningfulValue: !isInheritFailureStore(this._definition.ingest.failure_store), + hasChanged: () => + !_.isEqual( + this._definition.ingest.failure_store, + startingStateStreamDefinition!.ingest.failure_store + ), + }); - this._changes.processing = - !startingStateStreamDefinition || - !_.isEqual( - _.omit(this._definition.ingest.processing, ['updated_at']), - _.omit(startingStateStreamDefinition.ingest.processing, ['updated_at']) - ); + this._changes.processing = computeChange({ + isExistingStream, + hasMeaningfulValue: (this._definition.ingest.processing.steps || []).length > 0, + hasChanged: () => + !_.isEqual( + _.omit(this._definition.ingest.processing, ['updated_at']), + _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) + ), + }); - this._changes.lifecycle = - !startingStateStreamDefinition || - !_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle); + this._changes.lifecycle = computeChange({ + isExistingStream, + hasMeaningfulValue: !isInheritLifecycle(this._definition.ingest.lifecycle), + hasChanged: () => + !_.isEqual( + this._definition.ingest.lifecycle, + startingStateStreamDefinition!.ingest.lifecycle + ), + }); - this._changes.settings = - !startingStateStreamDefinition || - !_.isEqual(this._definition.ingest.settings, startingStateStreamDefinition.ingest.settings); + this._changes.settings = computeChange({ + isExistingStream, + hasMeaningfulValue: Object.keys(this._definition.ingest.settings || {}).length > 0, + hasChanged: () => + !_.isEqual( + this._definition.ingest.settings, + startingStateStreamDefinition!.ingest.settings + ), + }); this._changes.query_streams = !startingStateStreamDefinition ||