From c6ea60257a374ba6d9cc2d5c9484a7cf7732c888 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Wed, 28 Jan 2026 19:44:48 +0100 Subject: [PATCH 1/5] [Streams] Only mark _changes as true when values are meaningful Fixes #241300 When creating a new stream with empty/default values for properties like `fields`, `processing.steps`, `routing`, etc., the change tracking logic was unconditionally marking all properties as changed. This caused unnecessary validation and ES actions (like rollovers) to execute. This fix updates the `doHandleUpsertChange` method in both WiredStream and ClassicStream to only mark a property as changed when: - For new streams: the value is non-empty/non-default - For existing streams: the value actually differs from the previous state Properties affected: - WiredStream: ownFields, routing, processing, lifecycle, settings, failure_store - ClassicStream: processing, lifecycle, settings, field_overrides, failure_store Co-authored-by: Cursor --- .../streams/classic_stream.test.ts | 456 ++++++++++++++++++ .../streams/classic_stream.ts | 61 ++- .../streams/wired_stream.test.ts | 407 ++++++++++++++++ .../state_management/streams/wired_stream.ts | 81 ++-- 4 files changed, 946 insertions(+), 59 deletions(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts new file mode 100644 index 0000000000000..8f7f24f7cba67 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts @@ -0,0 +1,456 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Streams } from '@kbn/streams-schema'; +import { ClassicStream } from './classic_stream'; +import type { StateDependencies } from '../types'; +import type { State } from '../state'; + +describe('ClassicStream', () => { + const createMockDependencies = (): StateDependencies => + ({ + logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + isServerless: false, + isDev: false, + scopedClusterClient: { + asCurrentUser: { + indices: { + getDataStreamSettings: jest.fn().mockResolvedValue({ + data_streams: [ + { + name: 'logs-test-default', + effective_settings: { + index: {}, + }, + }, + ], + }), + }, + }, + }, + } as unknown as StateDependencies); + + const createMockState = ( + streams: Map = new Map() + ): State => + ({ + get: (name: string) => streams.get(name), + has: (name: string) => streams.has(name), + all: () => Array.from(streams.values()), + } as unknown as State); + + const createBaseClassicStreamDefinition = ( + overrides: Partial = {} + ): Streams.ClassicStream.Definition => ({ + name: 'logs-test-default', + description: 'Test stream', + updated_at: new Date().toISOString(), + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { + field_overrides: undefined, + }, + failure_store: { inherit: {} }, + }, + ...overrides, + }); + + describe('doHandleUpsertChange - _changes flags for new streams', () => { + it('sets processing to false when processing.steps is empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.processing).toBe(false); + }); + + it('sets processing to true when processing.steps is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.processing).toBe(true); + }); + + it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets lifecycle to true when using non-inherit lifecycle for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets settings to false when settings is empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.settings).toBe(false); + }); + + it('sets settings to true when settings is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: { 'index.refresh_interval': { value: '5s' } }, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.settings).toBe(true); + }); + + it('sets field_overrides to false when field_overrides is undefined for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.field_overrides).toBe(false); + }); + + it('sets field_overrides to false when field_overrides is empty object for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: {} }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.field_overrides).toBe(false); + }); + + it('sets field_overrides to true when field_overrides is non-empty for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { test_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.field_overrides).toBe(true); + }); + + it('sets failure_store to false when using inherit failure_store for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.failure_store).toBe(false); + }); + + it('sets failure_store to true when using non-inherit failure_store for new stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { lifecycle: { enabled: { data_retention: '7d' } } }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.failure_store).toBe(true); + }); + + it('sets all _changes to false when creating stream with all empty/default values', async () => { + const definition = createBaseClassicStreamDefinition(); + + const stream = new ClassicStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.processing).toBe(false); + expect(stream.hasChangedLifecycle()).toBe(false); + expect((stream as any)._changes.settings).toBe(false); + expect((stream as any)._changes.field_overrides).toBe(false); + expect((stream as any)._changes.failure_store).toBe(false); + }); + }); + + describe('doHandleUpsertChange - _changes flags for existing streams', () => { + it('sets processing to true when processing changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + + expect((stream as any)._changes.processing).toBe(true); + }); + + it('sets processing to false when processing unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: '2024-01-01T00:00:00.000Z', + }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + + expect((stream as any)._changes.processing).toBe(false); + }); + + it('sets lifecycle to true when lifecycle changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets lifecycle to false when lifecycle unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: undefined }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets field_overrides to true when field_overrides changed for existing stream', async () => { + const existingDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { old_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { new_field: { type: 'match_only_text' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs-test-default', { definition: existingDefinition }]]) + ); + + await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + + expect((stream as any)._changes.field_overrides).toBe(true); + }); + + it('sets field_overrides to false when field_overrides unchanged for existing stream', async () => { + const definition = createBaseClassicStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + classic: { field_overrides: { test_field: { type: 'keyword' } } }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new ClassicStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs-test-default', { definition }]])); + + await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + + expect((stream as any)._changes.field_overrides).toBe(false); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts index 25775a7a89ee0..73cb039b9ab46 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts @@ -92,34 +92,47 @@ export class ClassicStream extends StreamActiveRecord 0; + this._changes.processing = startingStateStreamDefinition + ? !_.isEqual( + _.omit(this._definition.ingest.processing, ['updated_at']), + _.omit(startingStateStreamDefinition.ingest.processing, ['updated_at']) + ) + : hasProcessingSteps; - this._changes.lifecycle = - !startingStateStreamDefinition || - !_.isEqual(this._definition.ingest.lifecycle, startingStateStreamDefinition.ingest.lifecycle); + const hasNonInheritLifecycle = !isInheritLifecycle(this._definition.ingest.lifecycle); + this._changes.lifecycle = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.lifecycle, + startingStateStreamDefinition.ingest.lifecycle + ) + : hasNonInheritLifecycle; - this._changes.settings = - !startingStateStreamDefinition || - !_.isEqual(await this.getEffectiveSettings(), this._definition.ingest.settings); + const hasSettings = Object.keys(this._definition.ingest.settings || {}).length > 0; + this._changes.settings = startingStateStreamDefinition + ? !_.isEqual(await this.getEffectiveSettings(), this._definition.ingest.settings) + : hasSettings; - this._changes.field_overrides = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.classic.field_overrides, - startingStateStreamDefinition.ingest.classic.field_overrides - ); + const hasFieldOverrides = !!( + this._definition.ingest.classic.field_overrides && + Object.keys(this._definition.ingest.classic.field_overrides).length > 0 + ); + this._changes.field_overrides = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.classic.field_overrides, + startingStateStreamDefinition.ingest.classic.field_overrides + ) + : hasFieldOverrides; - this._changes.failure_store = - !startingStateStreamDefinition || - !_.isEqual( - this._definition.ingest.failure_store, - startingStateStreamDefinition.ingest.failure_store - ); + const hasNonInheritFailureStore = !isInheritFailureStore(this._definition.ingest.failure_store); + this._changes.failure_store = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.failure_store, + startingStateStreamDefinition.ingest.failure_store + ) + : hasNonInheritFailureStore; // The newly upserted definition will always have a new updated_at timestamp. But, if processing didn't change, // we should keep the existing updated_at as processing wasn't touched. diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts new file mode 100644 index 0000000000000..3c1344954f1a9 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts @@ -0,0 +1,407 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Streams } from '@kbn/streams-schema'; +import { WiredStream } from './wired_stream'; +import type { StateDependencies } from '../types'; +import type { State } from '../state'; + +describe('WiredStream', () => { + const createMockDependencies = (): StateDependencies => + ({ + logger: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, + isServerless: false, + isDev: false, + } as unknown as StateDependencies); + + const createMockState = ( + streams: Map = new Map() + ): State => + ({ + get: (name: string) => streams.get(name), + has: (name: string) => streams.has(name), + all: () => Array.from(streams.values()), + } as unknown as State); + + const createBaseWiredStreamDefinition = ( + overrides: Partial = {} + ): Streams.WiredStream.Definition => ({ + name: 'logs.test', + description: 'Test stream', + updated_at: new Date().toISOString(), + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: {}, + routing: [], + }, + failure_store: { inherit: {} }, + }, + ...overrides, + }); + + describe('doHandleUpsertChange - _changes flags for new streams', () => { + it('sets ownFields to false when fields is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedFields()).toBe(false); + }); + + it('sets ownFields to true when fields is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: { 'test.field': { type: 'keyword' } }, + routing: [], + }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedFields()).toBe(true); + }); + + it('sets routing to false when routing is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.routing).toBe(false); + }); + + it('sets routing to true when routing is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { + fields: {}, + routing: [{ destination: 'logs.test.child', where: { never: {} }, status: 'enabled' }], + }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.routing).toBe(true); + }); + + it('sets processing to false when processing.steps is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.processing).toBe(false); + }); + + it('sets processing to true when processing.steps is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { + steps: [ + { action: 'grok', from: 'body.text', patterns: ['%{GREEDYDATA:attributes.data}'] }, + ], + updated_at: new Date().toISOString(), + }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect((stream as any)._changes.processing).toBe(true); + }); + + it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + + it('sets lifecycle to true when using non-inherit lifecycle for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets settings to false when settings is empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedSettings()).toBe(false); + }); + + it('sets settings to true when settings is non-empty for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: { 'index.refresh_interval': { value: '5s' } }, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedSettings()).toBe(true); + }); + + it('sets failure_store to false when using inherit failure_store for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedFailureStore()).toBe(false); + }); + + it('sets failure_store to true when using non-inherit failure_store for new stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { lifecycle: { enabled: { data_retention: '7d' } } }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedFailureStore()).toBe(true); + }); + + it('sets all _changes to false when creating stream with all empty/default values', async () => { + const definition = createBaseWiredStreamDefinition(); + + const stream = new WiredStream(definition, createMockDependencies()); + const emptyState = createMockState(); + + await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + + expect(stream.hasChangedFields()).toBe(false); + expect((stream as any)._changes.routing).toBe(false); + expect((stream as any)._changes.processing).toBe(false); + expect(stream.hasChangedLifecycle()).toBe(false); + expect(stream.hasChangedSettings()).toBe(false); + expect(stream.hasChangedFailureStore()).toBe(false); + }); + }); + + describe('doHandleUpsertChange - _changes flags for existing streams', () => { + it('sets ownFields to true when fields changed for existing stream', async () => { + const existingDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'old.field': { type: 'keyword' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'new.field': { type: 'match_only_text' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs.test', { definition: existingDefinition }]]) + ); + + await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + + expect(stream.hasChangedFields()).toBe(true); + }); + + it('sets ownFields to false when fields unchanged for existing stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: { 'test.field': { type: 'keyword' } }, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs.test', { definition }]])); + + await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + + expect(stream.hasChangedFields()).toBe(false); + }); + + it('sets lifecycle to true when lifecycle changed for existing stream', async () => { + const existingDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { inherit: {} }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const newDefinition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(existingDefinition, createMockDependencies()); + const startingState = createMockState( + new Map([['logs.test', { definition: existingDefinition }]]) + ); + + await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + + expect(stream.hasChangedLifecycle()).toBe(true); + }); + + it('sets lifecycle to false when lifecycle unchanged for existing stream', async () => { + const definition = createBaseWiredStreamDefinition({ + ingest: { + lifecycle: { dsl: { data_retention: '30d' } }, + processing: { steps: [], updated_at: new Date().toISOString() }, + settings: {}, + wired: { fields: {}, routing: [] }, + failure_store: { inherit: {} }, + }, + }); + + const stream = new WiredStream(definition, createMockDependencies()); + const startingState = createMockState(new Map([['logs.test', { definition }]])); + + await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + + expect(stream.hasChangedLifecycle()).toBe(false); + }); + }); +}); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index add87717836ed..e133ea10b51ed 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -126,41 +126,52 @@ export class WiredStream extends StreamActiveRecord 0; + this._changes.ownFields = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.wired.fields, + startingStateStreamDefinition.ingest.wired.fields + ) + : hasFields; + + const hasRouting = (this._definition.ingest.wired.routing || []).length > 0; + this._changes.routing = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.wired.routing, + startingStateStreamDefinition.ingest.wired.routing + ) + : hasRouting; + + const hasNonInheritFailureStore = !isInheritFailureStore(this._definition.ingest.failure_store); + this._changes.failure_store = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.failure_store, + startingStateStreamDefinition.ingest.failure_store + ) + : hasNonInheritFailureStore; + + const hasProcessingSteps = (this._definition.ingest.processing.steps || []).length > 0; + this._changes.processing = startingStateStreamDefinition + ? !_.isEqual( + _.omit(this._definition.ingest.processing, ['updated_at']), + _.omit(startingStateStreamDefinition.ingest.processing, ['updated_at']) + ) + : hasProcessingSteps; + + const hasNonInheritLifecycle = !isInheritLifecycle(this._definition.ingest.lifecycle); + this._changes.lifecycle = startingStateStreamDefinition + ? !_.isEqual( + this._definition.ingest.lifecycle, + startingStateStreamDefinition.ingest.lifecycle + ) + : hasNonInheritLifecycle; + + const hasSettings = Object.keys(this._definition.ingest.settings || {}).length > 0; + this._changes.settings = startingStateStreamDefinition + ? !_.isEqual(this._definition.ingest.settings, startingStateStreamDefinition.ingest.settings) + : hasSettings; // The newly upserted definition will always have a new updated_at timestamp. But, if processing didn't change, // we should keep the existing updated_at as processing wasn't touched. From e0636177d8f4c519835aed7fd25c07fa9c8af1b4 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 30 Jan 2026 16:48:08 +0100 Subject: [PATCH 2/5] Refactor change detection logic with computeChange helper Extract repetitive pattern for determining if a stream property changed into a reusable helper function. This makes the code more DRY by abstracting the logic: for existing streams, check if values changed; for new streams, check if value is meaningful/non-empty. --- .../streams/classic_stream.ts | 71 ++++++++++-------- .../state_management/streams/helpers.ts | 18 +++++ .../state_management/streams/wired_stream.ts | 73 +++++++++++-------- 3 files changed, 103 insertions(+), 59 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts index 73cb039b9ab46..c30031c29062e 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts @@ -38,7 +38,7 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import type { StateDependencies, StreamChange } from '../types'; -import { formatSettings, settingsUpdateRequiresRollover } from './helpers'; +import { computeChange, formatSettings, settingsUpdateRequiresRollover } from './helpers'; import { validateSettings, validateSettingsWithDryRun } from './validate_settings'; interface ClassicStreamChanges extends StreamChanges { @@ -92,47 +92,58 @@ export class ClassicStream extends StreamActiveRecord 0; - this._changes.processing = startingStateStreamDefinition - ? !_.isEqual( + const isExistingStream = !!startingStateStreamDefinition; + + this._changes.processing = computeChange( + isExistingStream, + (this._definition.ingest.processing.steps || []).length > 0, + () => + !_.isEqual( _.omit(this._definition.ingest.processing, ['updated_at']), - _.omit(startingStateStreamDefinition.ingest.processing, ['updated_at']) + _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) ) - : hasProcessingSteps; + ); - const hasNonInheritLifecycle = !isInheritLifecycle(this._definition.ingest.lifecycle); - this._changes.lifecycle = startingStateStreamDefinition - ? !_.isEqual( + this._changes.lifecycle = computeChange( + isExistingStream, + !isInheritLifecycle(this._definition.ingest.lifecycle), + () => + !_.isEqual( this._definition.ingest.lifecycle, - startingStateStreamDefinition.ingest.lifecycle + startingStateStreamDefinition!.ingest.lifecycle ) - : hasNonInheritLifecycle; - - const hasSettings = Object.keys(this._definition.ingest.settings || {}).length > 0; - this._changes.settings = startingStateStreamDefinition - ? !_.isEqual(await this.getEffectiveSettings(), this._definition.ingest.settings) - : hasSettings; + ); - const hasFieldOverrides = !!( - this._definition.ingest.classic.field_overrides && - Object.keys(this._definition.ingest.classic.field_overrides).length > 0 + // Prefetch effective settings for existing streams to allow sync comparison + const effectiveSettings = isExistingStream ? await this.getEffectiveSettings() : undefined; + this._changes.settings = computeChange( + isExistingStream, + Object.keys(this._definition.ingest.settings || {}).length > 0, + () => !_.isEqual(effectiveSettings, this._definition.ingest.settings) ); - this._changes.field_overrides = startingStateStreamDefinition - ? !_.isEqual( + + this._changes.field_overrides = computeChange( + isExistingStream, + !!( + this._definition.ingest.classic.field_overrides && + Object.keys(this._definition.ingest.classic.field_overrides).length > 0 + ), + () => + !_.isEqual( this._definition.ingest.classic.field_overrides, - startingStateStreamDefinition.ingest.classic.field_overrides + startingStateStreamDefinition!.ingest.classic.field_overrides ) - : hasFieldOverrides; + ); - const hasNonInheritFailureStore = !isInheritFailureStore(this._definition.ingest.failure_store); - this._changes.failure_store = startingStateStreamDefinition - ? !_.isEqual( + this._changes.failure_store = computeChange( + isExistingStream, + !isInheritFailureStore(this._definition.ingest.failure_store), + () => + !_.isEqual( this._definition.ingest.failure_store, - startingStateStreamDefinition.ingest.failure_store + startingStateStreamDefinition!.ingest.failure_store ) - : hasNonInheritFailureStore; + ); // The newly upserted definition will always have a new updated_at timestamp. But, if processing didn't change, // we should keep the existing updated_at as processing wasn't touched. diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts index adb6394e7530c..3f5f06b84c51c 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts @@ -7,6 +7,24 @@ import type { IngestStreamSettings } from '@kbn/streams-schema'; +/** + * Determines if a change flag should be set for a stream property. + * + * For existing streams (isExistingStream=true): returns true if the values are not equal (hasChanged) + * For new streams (isExistingStream=false): returns true if the value is meaningful/non-empty (hasMeaningfulValue) + * + * @param isExistingStream - Whether the stream already exists in the starting state + * @param hasMeaningfulValue - Whether the new value is meaningful (non-empty/non-default) + * @param hasChanged - Whether the value changed compared to the starting state (only evaluated for existing streams) + */ +export function computeChange( + isExistingStream: boolean, + hasMeaningfulValue: boolean, + hasChanged: () => boolean +): boolean { + return isExistingStream ? hasChanged() : hasMeaningfulValue; +} + export function formatSettings(settings: IngestStreamSettings, isServerless: boolean) { if (isServerless) { return { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index e133ea10b51ed..2b0ebc83ef88c 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -63,7 +63,7 @@ import type { } from '../stream_active_record/stream_active_record'; import { StreamActiveRecord } from '../stream_active_record/stream_active_record'; import { hasSupportedStreamsRoot } from '../../root_stream_definition'; -import { formatSettings, settingsUpdateRequiresRollover } from './helpers'; +import { computeChange, formatSettings, settingsUpdateRequiresRollover } from './helpers'; import { validateSettings, validateSettingsWithDryRun } from './validate_settings'; interface WiredStreamChanges extends StreamChanges { @@ -128,50 +128,65 @@ export class WiredStream extends StreamActiveRecord 0; - this._changes.ownFields = startingStateStreamDefinition - ? !_.isEqual( + const isExistingStream = !!startingStateStreamDefinition; + + this._changes.ownFields = computeChange( + isExistingStream, + Object.keys(this._definition.ingest.wired.fields || {}).length > 0, + () => + !_.isEqual( this._definition.ingest.wired.fields, - startingStateStreamDefinition.ingest.wired.fields + startingStateStreamDefinition!.ingest.wired.fields ) - : hasFields; + ); - const hasRouting = (this._definition.ingest.wired.routing || []).length > 0; - this._changes.routing = startingStateStreamDefinition - ? !_.isEqual( + this._changes.routing = computeChange( + isExistingStream, + (this._definition.ingest.wired.routing || []).length > 0, + () => + !_.isEqual( this._definition.ingest.wired.routing, - startingStateStreamDefinition.ingest.wired.routing + startingStateStreamDefinition!.ingest.wired.routing ) - : hasRouting; + ); - const hasNonInheritFailureStore = !isInheritFailureStore(this._definition.ingest.failure_store); - this._changes.failure_store = startingStateStreamDefinition - ? !_.isEqual( + this._changes.failure_store = computeChange( + isExistingStream, + !isInheritFailureStore(this._definition.ingest.failure_store), + () => + !_.isEqual( this._definition.ingest.failure_store, - startingStateStreamDefinition.ingest.failure_store + startingStateStreamDefinition!.ingest.failure_store ) - : hasNonInheritFailureStore; + ); - const hasProcessingSteps = (this._definition.ingest.processing.steps || []).length > 0; - this._changes.processing = startingStateStreamDefinition - ? !_.isEqual( + this._changes.processing = computeChange( + isExistingStream, + (this._definition.ingest.processing.steps || []).length > 0, + () => + !_.isEqual( _.omit(this._definition.ingest.processing, ['updated_at']), - _.omit(startingStateStreamDefinition.ingest.processing, ['updated_at']) + _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) ) - : hasProcessingSteps; + ); - const hasNonInheritLifecycle = !isInheritLifecycle(this._definition.ingest.lifecycle); - this._changes.lifecycle = startingStateStreamDefinition - ? !_.isEqual( + this._changes.lifecycle = computeChange( + isExistingStream, + !isInheritLifecycle(this._definition.ingest.lifecycle), + () => + !_.isEqual( this._definition.ingest.lifecycle, - startingStateStreamDefinition.ingest.lifecycle + startingStateStreamDefinition!.ingest.lifecycle ) - : hasNonInheritLifecycle; + ); const hasSettings = Object.keys(this._definition.ingest.settings || {}).length > 0; - this._changes.settings = startingStateStreamDefinition - ? !_.isEqual(this._definition.ingest.settings, startingStateStreamDefinition.ingest.settings) - : hasSettings; + this._changes.settings = computeChange( + isExistingStream, + hasSettings, + () => + !_.isEqual(this._definition.ingest.settings, startingStateStreamDefinition!.ingest.settings) + ); // The newly upserted definition will always have a new updated_at timestamp. But, if processing didn't change, // we should keep the existing updated_at as processing wasn't touched. From 20f7d9214c93368c4cde6c4f590e6d5237f84de6 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Mon, 2 Feb 2026 17:58:54 +0000 Subject: [PATCH 3/5] Changes from yarn openapi:bundle --- oas_docs/bundle.json | 182 +++++++++++++++++++++++++++++--- oas_docs/bundle.serverless.json | 182 +++++++++++++++++++++++++++++--- 2 files changed, 336 insertions(+), 28 deletions(-) diff --git a/oas_docs/bundle.json b/oas_docs/bundle.json index f8ef17734e065..5b24d56d09730 100644 --- a/oas_docs/bundle.json +++ b/oas_docs/bundle.json @@ -1222,7 +1222,7 @@ }, "/api/agent_builder/agents": { "get": { - "description": "List all available agents. Use this endpoint to retrieve complete agent information including their current configuration and assigned tools.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "List all available agents. Use this endpoint to retrieve complete agent information including their current configuration and assigned tools. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-agents", "parameters": [], "responses": { @@ -1293,7 +1293,7 @@ "x-state": "Added in 9.2.0" }, "post": { - "description": "Create a new agent. Use this endpoint to define the agent's behavior, appearance, and capabilities through comprehensive configuration options.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Create a new agent. Use this endpoint to define the agent's behavior, appearance, and capabilities through comprehensive configuration options. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "post-agent-builder-agents", "parameters": [ { @@ -1465,7 +1465,7 @@ }, "/api/agent_builder/agents/{id}": { "delete": { - "description": "Delete an agent by ID. This action cannot be undone.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Delete an agent by ID. This action cannot be undone. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "delete-agent-builder-agents-id", "parameters": [ { @@ -1512,7 +1512,7 @@ "x-state": "Added in 9.2.0" }, "get": { - "description": "Get a specific agent by ID. Use this endpoint to retrieve the complete agent definition including all configuration details and tool assignments.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Get a specific agent by ID. Use this endpoint to retrieve the complete agent definition including all configuration details and tool assignments. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-agents-id", "parameters": [ { @@ -1571,7 +1571,7 @@ "x-state": "Added in 9.2.0" }, "put": { - "description": "Update an existing agent configuration. Use this endpoint to modify any aspect of the agent's behavior, appearance, or capabilities.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Update an existing agent configuration. Use this endpoint to modify any aspect of the agent's behavior, appearance, or capabilities. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "put-agent-builder-agents-id", "parameters": [ { @@ -2483,7 +2483,7 @@ }, "/api/agent_builder/converse": { "post": { - "description": "Send a message to an agent and receive a complete response. This synchronous endpoint waits for the agent to fully process your request before returning the final result. Use this for simple chat interactions where you need the complete response.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Send a message to an agent and receive a complete response. This synchronous endpoint waits for the agent to fully process your request before returning the final result. Use this for simple chat interactions where you need the complete response. To learn more, refer to the [agent chat documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/chat).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-converse", "parameters": [ { @@ -2954,9 +2954,19 @@ }, "/api/agent_builder/mcp": { "post": { - "description": "> warn\n> This endpoint is designed for MCP clients (Claude Desktop, Cursor, VS Code, etc.) and should not be used directly via REST APIs. Use MCP Inspector or native MCP clients instead.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "> warn\n> This endpoint is designed for MCP clients (Claude Desktop, Cursor, VS Code, etc.) and should not be used directly via REST APIs. Use MCP Inspector or native MCP clients instead.\nTo learn more, refer to the [MCP documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/mcp-server).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-mcp", - "parameters": [], + "parameters": [ + { + "description": "Comma-separated list of namespaces to filter tools. Only tools matching the specified namespaces will be returned.", + "in": "query", + "name": "namespace", + "required": false, + "schema": { + "type": "string" + } + } + ], "requestBody": { "content": { "application/json": { @@ -3021,7 +3031,7 @@ }, "/api/agent_builder/tools": { "get": { - "description": "List all available tools. Use this endpoint to retrieve complete tool definitions including their schemas and configuration requirements.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "List all available tools. Use this endpoint to retrieve complete tool definitions including their schemas and configuration requirements. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-tools", "parameters": [], "responses": { @@ -3195,7 +3205,7 @@ "x-state": "Added in 9.2.0" }, "post": { - "description": "Create a new tool. Use this endpoint to define a custom tool with specific functionality and configuration for use by agents.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Create a new tool. Use this endpoint to define a custom tool with specific functionality and configuration for use by agents. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "post-agent-builder-tools", "parameters": [ { @@ -3398,7 +3408,7 @@ }, "/api/agent_builder/tools/_execute": { "post": { - "description": "Execute a tool with parameters. Use this endpoint to run a tool directly with specified inputs and optional external connector integration.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Execute a tool with parameters. Use this endpoint to run a tool directly with specified inputs and optional external connector integration. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-tools-execute", "parameters": [ { @@ -3703,7 +3713,7 @@ }, "/api/agent_builder/tools/{toolId}": { "delete": { - "description": "Delete a tool by ID. This action cannot be undone.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Delete a tool by ID. This action cannot be undone. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "delete-agent-builder-tools-toolid", "parameters": [ { @@ -3750,7 +3760,7 @@ "x-state": "Added in 9.2.0" }, "get": { - "description": "Get a specific tool by ID. Use this endpoint to retrieve the complete tool definition including its schema and configuration requirements.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Get a specific tool by ID. Use this endpoint to retrieve the complete tool definition including its schema and configuration requirements. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-tools-toolid", "parameters": [ { @@ -3887,7 +3897,7 @@ "x-state": "Added in 9.2.0" }, "put": { - "description": "Update an existing tool. Use this endpoint to modify any aspect of the tool's configuration or metadata.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Update an existing tool. Use this endpoint to modify any aspect of the tool's configuration or metadata. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "put-agent-builder-tools-toolid", "parameters": [ { @@ -57355,6 +57365,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -62268,6 +62301,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -65416,6 +65450,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -70329,6 +70386,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -73011,6 +73069,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -77924,6 +78005,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -80721,6 +80803,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -85634,6 +85739,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -88269,6 +88375,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -93182,6 +93311,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -95790,6 +95920,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -100703,6 +100856,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { diff --git a/oas_docs/bundle.serverless.json b/oas_docs/bundle.serverless.json index e95f2b7d8af5a..a484eca3e7e04 100644 --- a/oas_docs/bundle.serverless.json +++ b/oas_docs/bundle.serverless.json @@ -1222,7 +1222,7 @@ }, "/api/agent_builder/agents": { "get": { - "description": "List all available agents. Use this endpoint to retrieve complete agent information including their current configuration and assigned tools.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "List all available agents. Use this endpoint to retrieve complete agent information including their current configuration and assigned tools. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-agents", "parameters": [], "responses": { @@ -1293,7 +1293,7 @@ "x-state": "" }, "post": { - "description": "Create a new agent. Use this endpoint to define the agent's behavior, appearance, and capabilities through comprehensive configuration options.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Create a new agent. Use this endpoint to define the agent's behavior, appearance, and capabilities through comprehensive configuration options. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "post-agent-builder-agents", "parameters": [ { @@ -1465,7 +1465,7 @@ }, "/api/agent_builder/agents/{id}": { "delete": { - "description": "Delete an agent by ID. This action cannot be undone.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Delete an agent by ID. This action cannot be undone. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "delete-agent-builder-agents-id", "parameters": [ { @@ -1512,7 +1512,7 @@ "x-state": "" }, "get": { - "description": "Get a specific agent by ID. Use this endpoint to retrieve the complete agent definition including all configuration details and tool assignments.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Get a specific agent by ID. Use this endpoint to retrieve the complete agent definition including all configuration details and tool assignments. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-agents-id", "parameters": [ { @@ -1571,7 +1571,7 @@ "x-state": "" }, "put": { - "description": "Update an existing agent configuration. Use this endpoint to modify any aspect of the agent's behavior, appearance, or capabilities.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Update an existing agent configuration. Use this endpoint to modify any aspect of the agent's behavior, appearance, or capabilities. To learn more, refer to the [agents documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/agent-builder-agents).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "put-agent-builder-agents-id", "parameters": [ { @@ -2483,7 +2483,7 @@ }, "/api/agent_builder/converse": { "post": { - "description": "Send a message to an agent and receive a complete response. This synchronous endpoint waits for the agent to fully process your request before returning the final result. Use this for simple chat interactions where you need the complete response.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Send a message to an agent and receive a complete response. This synchronous endpoint waits for the agent to fully process your request before returning the final result. Use this for simple chat interactions where you need the complete response. To learn more, refer to the [agent chat documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/chat).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-converse", "parameters": [ { @@ -2954,9 +2954,19 @@ }, "/api/agent_builder/mcp": { "post": { - "description": "> warn\n> This endpoint is designed for MCP clients (Claude Desktop, Cursor, VS Code, etc.) and should not be used directly via REST APIs. Use MCP Inspector or native MCP clients instead.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "> warn\n> This endpoint is designed for MCP clients (Claude Desktop, Cursor, VS Code, etc.) and should not be used directly via REST APIs. Use MCP Inspector or native MCP clients instead.\nTo learn more, refer to the [MCP documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/mcp-server).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-mcp", - "parameters": [], + "parameters": [ + { + "description": "Comma-separated list of namespaces to filter tools. Only tools matching the specified namespaces will be returned.", + "in": "query", + "name": "namespace", + "required": false, + "schema": { + "type": "string" + } + } + ], "requestBody": { "content": { "application/json": { @@ -3021,7 +3031,7 @@ }, "/api/agent_builder/tools": { "get": { - "description": "List all available tools. Use this endpoint to retrieve complete tool definitions including their schemas and configuration requirements.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "List all available tools. Use this endpoint to retrieve complete tool definitions including their schemas and configuration requirements. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-tools", "parameters": [], "responses": { @@ -3195,7 +3205,7 @@ "x-state": "" }, "post": { - "description": "Create a new tool. Use this endpoint to define a custom tool with specific functionality and configuration for use by agents.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Create a new tool. Use this endpoint to define a custom tool with specific functionality and configuration for use by agents. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "post-agent-builder-tools", "parameters": [ { @@ -3398,7 +3408,7 @@ }, "/api/agent_builder/tools/_execute": { "post": { - "description": "Execute a tool with parameters. Use this endpoint to run a tool directly with specified inputs and optional external connector integration.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Execute a tool with parameters. Use this endpoint to run a tool directly with specified inputs and optional external connector integration. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "post-agent-builder-tools-execute", "parameters": [ { @@ -3703,7 +3713,7 @@ }, "/api/agent_builder/tools/{toolId}": { "delete": { - "description": "Delete a tool by ID. This action cannot be undone.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Delete a tool by ID. This action cannot be undone. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "delete-agent-builder-tools-toolid", "parameters": [ { @@ -3750,7 +3760,7 @@ "x-state": "" }, "get": { - "description": "Get a specific tool by ID. Use this endpoint to retrieve the complete tool definition including its schema and configuration requirements.

[Required authorization] Route required privileges: read_agent_builder.", + "description": "Get a specific tool by ID. Use this endpoint to retrieve the complete tool definition including its schema and configuration requirements. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: read_agent_builder.", "operationId": "get-agent-builder-tools-toolid", "parameters": [ { @@ -3887,7 +3897,7 @@ "x-state": "" }, "put": { - "description": "Update an existing tool. Use this endpoint to modify any aspect of the tool's configuration or metadata.

[Required authorization] Route required privileges: manage_agent_builder.", + "description": "Update an existing tool. Use this endpoint to modify any aspect of the tool's configuration or metadata. To learn more, refer to the [tools documentation](https://www.elastic.co/docs/explore-analyze/ai-features/agent-builder/tools).

[Required authorization] Route required privileges: manage_agent_builder.", "operationId": "put-agent-builder-tools-toolid", "parameters": [ { @@ -56423,6 +56433,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -61336,6 +61369,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -64484,6 +64518,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -69397,6 +69454,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -72079,6 +72137,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -76992,6 +77073,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -79789,6 +79871,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -84702,6 +84807,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -87337,6 +87443,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -92250,6 +92379,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { @@ -94858,6 +94988,29 @@ "description": "A non-empty string.", "minLength": 1, "type": "string" + }, + "downsample": { + "items": { + "additionalProperties": false, + "properties": { + "after": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + }, + "fixed_interval": { + "description": "A non-empty string.", + "minLength": 1, + "type": "string" + } + }, + "required": [ + "after", + "fixed_interval" + ], + "type": "object" + }, + "type": "array" } }, "type": "object" @@ -99771,6 +99924,7 @@ "minLength": 1, "type": "string" }, + "minItems": 1, "type": "array" }, "ignore_failure": { From 398abc606839a575902d5385ed6b671e46afc115 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 17 Feb 2026 13:24:06 +0100 Subject: [PATCH 4/5] refactor: change computeChange to use options object Changed the interface of computeChange from positional parameters to an options object with descriptive keys (isExistingStream, hasMeaningfulValue, hasChanged) for improved readability at call sites. Co-authored-by: Cursor --- .../streams/classic_stream.ts | 48 +++++++------- .../state_management/streams/helpers.ts | 23 ++++--- .../state_management/streams/wired_stream.ts | 64 ++++++++++--------- 3 files changed, 71 insertions(+), 64 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts index cafca32d3b8d7..1ed4c0383e7c2 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.ts @@ -102,56 +102,56 @@ export class ClassicStream extends StreamActiveRecord 0, - () => + hasMeaningfulValue: (this._definition.ingest.processing.steps || []).length > 0, + hasChanged: () => !_.isEqual( _.omit(this._definition.ingest.processing, ['updated_at']), _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) - ) - ); + ), + }); - this._changes.lifecycle = computeChange( + this._changes.lifecycle = computeChange({ isExistingStream, - !isInheritLifecycle(this._definition.ingest.lifecycle), - () => + hasMeaningfulValue: !isInheritLifecycle(this._definition.ingest.lifecycle), + hasChanged: () => !_.isEqual( this._definition.ingest.lifecycle, startingStateStreamDefinition!.ingest.lifecycle - ) - ); + ), + }); // Prefetch effective settings for existing streams to allow sync comparison const effectiveSettings = isExistingStream ? await this.getEffectiveSettings() : undefined; - this._changes.settings = computeChange( + this._changes.settings = computeChange({ isExistingStream, - Object.keys(this._definition.ingest.settings || {}).length > 0, - () => !_.isEqual(effectiveSettings, this._definition.ingest.settings) - ); + hasMeaningfulValue: Object.keys(this._definition.ingest.settings || {}).length > 0, + hasChanged: () => !_.isEqual(effectiveSettings, this._definition.ingest.settings), + }); - this._changes.field_overrides = computeChange( + this._changes.field_overrides = computeChange({ isExistingStream, - !!( + hasMeaningfulValue: !!( this._definition.ingest.classic.field_overrides && Object.keys(this._definition.ingest.classic.field_overrides).length > 0 ), - () => + hasChanged: () => !_.isEqual( this._definition.ingest.classic.field_overrides, startingStateStreamDefinition!.ingest.classic.field_overrides - ) - ); + ), + }); - this._changes.failure_store = computeChange( + this._changes.failure_store = computeChange({ isExistingStream, - !isInheritFailureStore(this._definition.ingest.failure_store), - () => + hasMeaningfulValue: !isInheritFailureStore(this._definition.ingest.failure_store), + hasChanged: () => !_.isEqual( this._definition.ingest.failure_store, startingStateStreamDefinition!.ingest.failure_store - ) - ); + ), + }); this._changes.query_streams = !startingStateStreamDefinition || diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts index b7376a73062a3..7e3741d2bf2b6 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/helpers.ts @@ -10,21 +10,26 @@ import type { BaseStream } from '@kbn/streams-schema/src/models/base'; import type { State } from '../state'; import type { ValidationResult } from '../stream_active_record/stream_active_record'; +interface ComputeChangeOptions { + /** Whether the stream already exists in the starting state */ + isExistingStream: boolean; + /** Whether the new value is meaningful (non-empty/non-default) */ + hasMeaningfulValue: boolean; + /** Whether the value changed compared to the starting state (only evaluated for existing streams) */ + hasChanged: () => boolean; +} + /** * Determines if a change flag should be set for a stream property. * * For existing streams (isExistingStream=true): returns true if the values are not equal (hasChanged) * For new streams (isExistingStream=false): returns true if the value is meaningful/non-empty (hasMeaningfulValue) - * - * @param isExistingStream - Whether the stream already exists in the starting state - * @param hasMeaningfulValue - Whether the new value is meaningful (non-empty/non-default) - * @param hasChanged - Whether the value changed compared to the starting state (only evaluated for existing streams) */ -export function computeChange( - isExistingStream: boolean, - hasMeaningfulValue: boolean, - hasChanged: () => boolean -): boolean { +export function computeChange({ + isExistingStream, + hasMeaningfulValue, + hasChanged, +}: ComputeChangeOptions): boolean { return isExistingStream ? hasChanged() : hasMeaningfulValue; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts index 02d4490596e7a..b1f691dda2e96 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.ts @@ -137,63 +137,65 @@ export class WiredStream extends StreamActiveRecord 0, - () => + hasMeaningfulValue: Object.keys(this._definition.ingest.wired.fields || {}).length > 0, + hasChanged: () => !_.isEqual( this._definition.ingest.wired.fields, startingStateStreamDefinition!.ingest.wired.fields - ) - ); + ), + }); - this._changes.routing = computeChange( + this._changes.routing = computeChange({ isExistingStream, - (this._definition.ingest.wired.routing || []).length > 0, - () => + hasMeaningfulValue: (this._definition.ingest.wired.routing || []).length > 0, + hasChanged: () => !_.isEqual( this._definition.ingest.wired.routing, startingStateStreamDefinition!.ingest.wired.routing - ) - ); + ), + }); - this._changes.failure_store = computeChange( + this._changes.failure_store = computeChange({ isExistingStream, - !isInheritFailureStore(this._definition.ingest.failure_store), - () => + hasMeaningfulValue: !isInheritFailureStore(this._definition.ingest.failure_store), + hasChanged: () => !_.isEqual( this._definition.ingest.failure_store, startingStateStreamDefinition!.ingest.failure_store - ) - ); + ), + }); - this._changes.processing = computeChange( + this._changes.processing = computeChange({ isExistingStream, - (this._definition.ingest.processing.steps || []).length > 0, - () => + hasMeaningfulValue: (this._definition.ingest.processing.steps || []).length > 0, + hasChanged: () => !_.isEqual( _.omit(this._definition.ingest.processing, ['updated_at']), _.omit(startingStateStreamDefinition!.ingest.processing, ['updated_at']) - ) - ); + ), + }); - this._changes.lifecycle = computeChange( + this._changes.lifecycle = computeChange({ isExistingStream, - !isInheritLifecycle(this._definition.ingest.lifecycle), - () => + hasMeaningfulValue: !isInheritLifecycle(this._definition.ingest.lifecycle), + hasChanged: () => !_.isEqual( this._definition.ingest.lifecycle, startingStateStreamDefinition!.ingest.lifecycle - ) - ); + ), + }); - const hasSettings = Object.keys(this._definition.ingest.settings || {}).length > 0; - this._changes.settings = computeChange( + this._changes.settings = computeChange({ isExistingStream, - hasSettings, - () => - !_.isEqual(this._definition.ingest.settings, startingStateStreamDefinition!.ingest.settings) - ); + hasMeaningfulValue: Object.keys(this._definition.ingest.settings || {}).length > 0, + hasChanged: () => + !_.isEqual( + this._definition.ingest.settings, + startingStateStreamDefinition!.ingest.settings + ), + }); this._changes.query_streams = !startingStateStreamDefinition || From e862ed0c1a97f56fde42ec441c28597b8279c6dc Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Thu, 19 Feb 2026 10:41:08 +0100 Subject: [PATCH 5/5] fix(streams): resolve no-explicit-any ESLint errors after merge Update test files to use proper type annotations instead of `as any` casts to comply with the no-explicit-any ESLint rule that was added to the streams plugins. Co-authored-by: Cursor --- .../streams/classic_stream.test.ts | 163 ++++++++++++++---- .../streams/wired_stream.test.ts | 136 ++++++++++++--- 2 files changed, 239 insertions(+), 60 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts index 8f7f24f7cba67..de4b995f5371c 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/classic_stream.test.ts @@ -7,8 +7,27 @@ import type { Streams } from '@kbn/streams-schema'; import { ClassicStream } from './classic_stream'; -import type { StateDependencies } from '../types'; +import type { StateDependencies, StreamChange } from '../types'; import type { State } from '../state'; +import type { StreamChangeStatus } from '../stream_active_record/stream_active_record'; + +interface ClassicStreamChanges { + processing: boolean; + field_overrides: boolean; + failure_store: boolean; + lifecycle: boolean; + settings: boolean; + query_streams: boolean; +} + +interface ClassicStreamTestable { + _changes: ClassicStreamChanges; + doHandleUpsertChange( + definition: Streams.all.Definition, + desiredState: State, + startingState: State + ): Promise<{ cascadingChanges: StreamChange[]; changeStatus: StreamChangeStatus }>; +} describe('ClassicStream', () => { const createMockDependencies = (): StateDependencies => @@ -76,9 +95,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.processing).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); }); it('sets processing to true when processing.steps is non-empty for new stream', async () => { @@ -100,9 +123,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.processing).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(true); }); it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { @@ -119,7 +146,11 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedLifecycle()).toBe(false); }); @@ -138,7 +169,11 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedLifecycle()).toBe(true); }); @@ -157,9 +192,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.settings).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(false); }); it('sets settings to true when settings is non-empty for new stream', async () => { @@ -176,9 +215,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.settings).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(true); }); it('sets field_overrides to false when field_overrides is undefined for new stream', async () => { @@ -195,9 +238,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.field_overrides).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); }); it('sets field_overrides to false when field_overrides is empty object for new stream', async () => { @@ -214,9 +261,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.field_overrides).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); }); it('sets field_overrides to true when field_overrides is non-empty for new stream', async () => { @@ -233,9 +284,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.field_overrides).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(true); }); it('sets failure_store to false when using inherit failure_store for new stream', async () => { @@ -252,9 +307,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.failure_store).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(false); }); it('sets failure_store to true when using non-inherit failure_store for new stream', async () => { @@ -271,9 +330,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.failure_store).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(true); }); it('sets all _changes to false when creating stream with all empty/default values', async () => { @@ -282,13 +345,17 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.processing).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); expect(stream.hasChangedLifecycle()).toBe(false); - expect((stream as any)._changes.settings).toBe(false); - expect((stream as any)._changes.field_overrides).toBe(false); - expect((stream as any)._changes.failure_store).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.settings).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.failure_store).toBe(false); }); }); @@ -324,9 +391,13 @@ describe('ClassicStream', () => { new Map([['logs-test-default', { definition: existingDefinition }]]) ); - await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); - expect((stream as any)._changes.processing).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(true); }); it('sets processing to false when processing unchanged for existing stream', async () => { @@ -348,9 +419,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const startingState = createMockState(new Map([['logs-test-default', { definition }]])); - await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); - expect((stream as any)._changes.processing).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.processing).toBe(false); }); it('sets lifecycle to true when lifecycle changed for existing stream', async () => { @@ -379,7 +454,11 @@ describe('ClassicStream', () => { new Map([['logs-test-default', { definition: existingDefinition }]]) ); - await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); expect(stream.hasChangedLifecycle()).toBe(true); }); @@ -398,7 +477,11 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const startingState = createMockState(new Map([['logs-test-default', { definition }]])); - await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); expect(stream.hasChangedLifecycle()).toBe(false); }); @@ -429,9 +512,13 @@ describe('ClassicStream', () => { new Map([['logs-test-default', { definition: existingDefinition }]]) ); - await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); - expect((stream as any)._changes.field_overrides).toBe(true); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(true); }); it('sets field_overrides to false when field_overrides unchanged for existing stream', async () => { @@ -448,9 +535,13 @@ describe('ClassicStream', () => { const stream = new ClassicStream(definition, createMockDependencies()); const startingState = createMockState(new Map([['logs-test-default', { definition }]])); - await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + await (stream as unknown as ClassicStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); - expect((stream as any)._changes.field_overrides).toBe(false); + expect((stream as unknown as ClassicStreamTestable)._changes.field_overrides).toBe(false); }); }); }); diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts index 3c1344954f1a9..aa765a9f9f964 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/streams/wired_stream.test.ts @@ -7,8 +7,28 @@ import type { Streams } from '@kbn/streams-schema'; import { WiredStream } from './wired_stream'; -import type { StateDependencies } from '../types'; +import type { StateDependencies, StreamChange } from '../types'; import type { State } from '../state'; +import type { StreamChangeStatus } from '../stream_active_record/stream_active_record'; + +interface WiredStreamChanges { + ownFields: boolean; + ownRouting: boolean; + routing: boolean; + processing: boolean; + lifecycle: boolean; + settings: boolean; + failure_store: boolean; +} + +interface WiredStreamTestable { + _changes: WiredStreamChanges; + doHandleUpsertChange( + definition: Streams.all.Definition, + desiredState: State, + startingState: State + ): Promise<{ cascadingChanges: StreamChange[]; changeStatus: StreamChangeStatus }>; +} describe('WiredStream', () => { const createMockDependencies = (): StateDependencies => @@ -61,7 +81,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedFields()).toBe(false); }); @@ -83,7 +107,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedFields()).toBe(true); }); @@ -102,9 +130,13 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.routing).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(false); }); it('sets routing to true when routing is non-empty for new stream', async () => { @@ -124,9 +156,13 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.routing).toBe(true); + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(true); }); it('sets processing to false when processing.steps is empty for new stream', async () => { @@ -143,9 +179,13 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.processing).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(false); }); it('sets processing to true when processing.steps is non-empty for new stream', async () => { @@ -167,9 +207,13 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); - expect((stream as any)._changes.processing).toBe(true); + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(true); }); it('sets lifecycle to false when using inherit lifecycle for new stream', async () => { @@ -186,7 +230,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedLifecycle()).toBe(false); }); @@ -205,7 +253,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedLifecycle()).toBe(true); }); @@ -224,7 +276,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedSettings()).toBe(false); }); @@ -243,7 +299,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedSettings()).toBe(true); }); @@ -262,7 +322,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedFailureStore()).toBe(false); }); @@ -281,7 +345,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedFailureStore()).toBe(true); }); @@ -292,11 +360,15 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const emptyState = createMockState(); - await (stream as any).doHandleUpsertChange(definition, emptyState, emptyState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + emptyState, + emptyState + ); expect(stream.hasChangedFields()).toBe(false); - expect((stream as any)._changes.routing).toBe(false); - expect((stream as any)._changes.processing).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.routing).toBe(false); + expect((stream as unknown as WiredStreamTestable)._changes.processing).toBe(false); expect(stream.hasChangedLifecycle()).toBe(false); expect(stream.hasChangedSettings()).toBe(false); expect(stream.hasChangedFailureStore()).toBe(false); @@ -330,7 +402,11 @@ describe('WiredStream', () => { new Map([['logs.test', { definition: existingDefinition }]]) ); - await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); expect(stream.hasChangedFields()).toBe(true); }); @@ -349,7 +425,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const startingState = createMockState(new Map([['logs.test', { definition }]])); - await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); expect(stream.hasChangedFields()).toBe(false); }); @@ -380,7 +460,11 @@ describe('WiredStream', () => { new Map([['logs.test', { definition: existingDefinition }]]) ); - await (stream as any).doHandleUpsertChange(newDefinition, startingState, startingState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + newDefinition, + startingState, + startingState + ); expect(stream.hasChangedLifecycle()).toBe(true); }); @@ -399,7 +483,11 @@ describe('WiredStream', () => { const stream = new WiredStream(definition, createMockDependencies()); const startingState = createMockState(new Map([['logs.test', { definition }]])); - await (stream as any).doHandleUpsertChange(definition, startingState, startingState); + await (stream as unknown as WiredStreamTestable).doHandleUpsertChange( + definition, + startingState, + startingState + ); expect(stream.hasChangedLifecycle()).toBe(false); });