From 5bc843036c3ba60c809ab8c34f59e4c4515dc741 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 6 Jun 2025 12:38:29 +0200 Subject: [PATCH 1/4] add lock manager --- .../streams/server/lib/streams/client.ts | 3 +- .../streams/server/lib/streams/service.ts | 1 + .../errors/concurrent_access_error.ts | 15 ++++ .../lib/streams/state_management/state.ts | 44 ++++++++---- .../lib/streams/state_management/types.ts | 3 +- .../apis/observability/streams/conflicts.ts | 70 +++++++++++++++++++ .../apis/observability/streams/index.ts | 1 + 7 files changed, 122 insertions(+), 15 deletions(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts index cb42a7e3fd965..25f99eb9f6636 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts @@ -11,7 +11,7 @@ import { QueryDslQueryContainer, Result, } from '@elastic/elasticsearch/lib/api/types'; -import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server'; +import type { IScopedClusterClient, Logger, KibanaRequest, CoreSetup } from '@kbn/core/server'; import { isNotFoundError } from '@kbn/es-errors'; import { Condition, getAncestors, getParentId, Streams } from '@kbn/streams-schema'; import { AssetClient } from './assets/asset_client'; @@ -57,6 +57,7 @@ function wrapEsCall(p: Promise): Promise { export class StreamsClient { constructor( private readonly dependencies: { + coreSetup: CoreSetup; scopedClusterClient: IScopedClusterClient; assetClient: AssetClient; storageClient: StreamsStorageClient; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts index 6b5108063adec..2085d334090a7 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts @@ -63,6 +63,7 @@ export class StreamsService { assetClient, logger, scopedClusterClient, + coreSetup: this.coreSetup, storageClient: storageAdapter.getClient(), request, isServerless, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts new file mode 100644 index 0000000000000..c77455b13bfab --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from '../../errors/status_error'; + +export class ConcurrentAccessError extends StatusError { + constructor(message: string) { + super(message, 409); + this.name = 'ConcurrentAccessError'; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts index e613cc5caa132..3d19d082707c4 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts @@ -6,6 +6,7 @@ */ import { difference, intersection, isEqual } from 'lodash'; +import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; import { StatusError } from '../errors/status_error'; import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error'; import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error'; @@ -20,6 +21,7 @@ import type { } from './stream_active_record/stream_active_record'; import { streamFromDefinition } from './stream_active_record/stream_from_definition'; import type { StateDependencies, StreamChange } from './types'; +import { ConcurrentAccessError } from './errors/concurrent_access_error'; interface Changes { created: string[]; @@ -81,19 +83,35 @@ export class State { elasticsearchActions, }; } else { - try { - await desiredState.commitChanges(startingState); - return { status: 'success', changes: desiredState.changes(startingState) }; - } catch (error) { - await desiredState.attemptRollback(startingState, error); - return { - status: 'failed_with_rollback', - error: new StatusError( - `Failed to apply changes but successfully rolled back to previous state: ${error.message}`, - error.statusCode ?? 500 - ), - }; - } + const lmService = new LockManagerService(dependencies.coreSetup, dependencies.logger); + return lmService + .withLock('streams_api', async () => { + try { + // wait for 10s + await new Promise((resolve, reject) => { + setTimeout(() => { + resolve(null); + }, 10000); + }); + await desiredState.commitChanges(startingState); + return { status: 'success' as const, changes: desiredState.changes(startingState) }; + } catch (error) { + await desiredState.attemptRollback(startingState, error); + return { + status: 'failed_with_rollback' as const, + error: new StatusError( + `Failed to apply changes but successfully rolled back to previous state: ${error.message}`, + error.statusCode ?? 500 + ), + }; + } + }) + .catch((error) => { + if (error instanceof LockAcquisitionError) { + throw new ConcurrentAccessError('Could not acquire lock for applying changes'); + } + throw error; + }); } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts index 0eec791722235..44127b7a0bd0b 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { IScopedClusterClient, Logger } from '@kbn/core/server'; +import type { CoreSetup, IScopedClusterClient, Logger } from '@kbn/core/server'; import { Streams } from '@kbn/streams-schema'; import type { AssetClient } from '../assets/asset_client'; import type { StreamsClient } from '../client'; @@ -25,6 +25,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange; export interface StateDependencies { logger: Logger; + coreSetup: CoreSetup; streamsClient: StreamsClient; storageClient: StreamsStorageClient; scopedClusterClient: IScopedClusterClient; diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts new file mode 100644 index 0000000000000..cf4f495e73c29 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { disableStreams, enableStreams, forkStream } from './helpers/requests'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + let apiClient: StreamsSupertestRepositoryClient; + + describe('conflict handling', function () { + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + await enableStreams(apiClient); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + it('should not allow multiple requests manipulating streams state at once', async () => { + const stream1 = { + stream: { + name: 'logs.nginx', + }, + if: { + field: 'resource.attributes.host.name', + operator: 'eq' as const, + value: 'routeme', + }, + }; + const stream2 = { + stream: { + name: 'logs.apache', + }, + if: { + field: 'resource.attributes.host.name', + operator: 'eq' as const, + value: 'routeme2', + }, + }; + const responses = await Promise.allSettled([ + forkStream(apiClient, 'logs', stream1), + forkStream(apiClient, 'logs', stream2), + ]); + // Assert than one of the requests failed with a conflict error and the other succeeded + // It needs to check either way (success or failure) because the order of execution is not guaranteed + expect(responses).to.have.length(2); + const successResponse = responses.find( + (response) => response.status === 'fulfilled' && response.value.acknowledged + ); + const conflictResponse = responses.find( + (response) => + response.status === 'rejected' && + String(response.reason).toLowerCase().includes('conflict') + ); + expect(successResponse).to.not.be(undefined); + expect(conflictResponse).to.not.be(undefined); + }); + }); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts index e0069d24052cf..e8e0621f0b1ed 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts @@ -26,5 +26,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) loadTestFile(require.resolve('./content')); loadTestFile(require.resolve('./migration_on_read')); loadTestFile(require.resolve('./meta_data')); + loadTestFile(require.resolve('./conflicts')); }); } From 19e7b6114674bca7a174a0d11591d9798498ff70 Mon Sep 17 00:00:00 2001 From: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Date: Fri, 6 Jun 2025 10:56:26 +0000 Subject: [PATCH 2/4] [CI] Auto-commit changed files from 'node scripts/styled_components_mapping' --- x-pack/platform/plugins/shared/streams/tsconfig.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/platform/plugins/shared/streams/tsconfig.json b/x-pack/platform/plugins/shared/streams/tsconfig.json index a1122b1fb8b10..95768c441dbdc 100644 --- a/x-pack/platform/plugins/shared/streams/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams/tsconfig.json @@ -50,6 +50,7 @@ "@kbn/i18n", "@kbn/zod-helpers", "@kbn/core-http-server-utils", - "@kbn/inference-common" + "@kbn/inference-common", + "@kbn/lock-manager" ] } From 5be78109a638bcf06fbe5997f78988065893c5f7 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Fri, 6 Jun 2025 15:15:03 +0200 Subject: [PATCH 3/4] move things where they belong --- .../shared/streams/server/lib/streams/client.ts | 5 +++-- .../shared/streams/server/lib/streams/service.ts | 3 ++- .../server/lib/streams/state_management/state.test.ts | 4 ++++ .../server/lib/streams/state_management/state.ts | 10 ++-------- .../server/lib/streams/state_management/types.ts | 5 +++-- 5 files changed, 14 insertions(+), 13 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts index 59bfd24767ab5..0dc9eb607f1ed 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts @@ -11,9 +11,10 @@ import { QueryDslQueryContainer, Result, } from '@elastic/elasticsearch/lib/api/types'; -import type { IScopedClusterClient, Logger, KibanaRequest, CoreSetup } from '@kbn/core/server'; +import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server'; import { isNotFoundError } from '@kbn/es-errors'; import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import { AssetClient } from './assets/asset_client'; import { ASSET_ID, ASSET_TYPE } from './assets/fields'; import { QueryClient } from './assets/query/query_client'; @@ -58,7 +59,7 @@ function wrapEsCall(p: Promise): Promise { export class StreamsClient { constructor( private readonly dependencies: { - coreSetup: CoreSetup; + lockManager: LockManagerService; scopedClusterClient: IScopedClusterClient; assetClient: AssetClient; queryClient: QueryClient; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts index c57e9e83d52cb..9e5ff87e483b4 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts @@ -8,6 +8,7 @@ import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server'; import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import type { StreamsPluginStartDependencies } from '../../types'; import { AssetClient } from './assets/asset_client'; import { QueryClient } from './assets/query/query_client'; @@ -66,7 +67,7 @@ export class StreamsService { queryClient, logger, scopedClusterClient, - coreSetup: this.coreSetup, + lockManager: new LockManagerService(this.coreSetup, logger), storageClient: storageAdapter.getClient(), request, isServerless, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts index db58a15400271..36dfe87e083c0 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts @@ -21,6 +21,7 @@ import { StreamChange } from './types'; import { ElasticsearchAction } from './execution_plan/types'; import { ExecutionPlan } from './execution_plan/execution_plan'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; describe('State', () => { const searchMock = jest.fn(); @@ -29,6 +30,9 @@ describe('State', () => { }; const stateDependenciesMock = { storageClient: storageClientMock, + lockManager: { + withLock: (_, cb) => cb(), + } as LockManagerService, isDev: true, } as any; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts index 3d19d082707c4..60c90f6223f9c 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts @@ -6,7 +6,7 @@ */ import { difference, intersection, isEqual } from 'lodash'; -import { LockAcquisitionError, LockManagerService } from '@kbn/lock-manager'; +import { LockAcquisitionError } from '@kbn/lock-manager'; import { StatusError } from '../errors/status_error'; import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error'; import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error'; @@ -83,16 +83,10 @@ export class State { elasticsearchActions, }; } else { - const lmService = new LockManagerService(dependencies.coreSetup, dependencies.logger); + const lmService = dependencies.lockManager; return lmService .withLock('streams_api', async () => { try { - // wait for 10s - await new Promise((resolve, reject) => { - setTimeout(() => { - resolve(null); - }, 10000); - }); await desiredState.commitChanges(startingState); return { status: 'success' as const, changes: desiredState.changes(startingState) }; } catch (error) { diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts index 44127b7a0bd0b..6af295e71d5ab 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts @@ -5,8 +5,9 @@ * 2.0. */ -import type { CoreSetup, IScopedClusterClient, Logger } from '@kbn/core/server'; +import type { IScopedClusterClient, Logger } from '@kbn/core/server'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import type { AssetClient } from '../assets/asset_client'; import type { StreamsClient } from '../client'; import type { StreamsStorageClient } from '../service'; @@ -25,7 +26,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange; export interface StateDependencies { logger: Logger; - coreSetup: CoreSetup; + lockManager: LockManagerService; streamsClient: StreamsClient; storageClient: StreamsStorageClient; scopedClusterClient: IScopedClusterClient; From edbce3bb6a25f612e4f6af6ca95cece561b26d6c Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 16 Jun 2025 15:39:12 +0200 Subject: [PATCH 4/4] comments --- .../streams/server/lib/streams/state_management/state.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts index 60c90f6223f9c..34b663beb125a 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts @@ -6,7 +6,7 @@ */ import { difference, intersection, isEqual } from 'lodash'; -import { LockAcquisitionError } from '@kbn/lock-manager'; +import { isLockAcquisitionError } from '@kbn/lock-manager'; import { StatusError } from '../errors/status_error'; import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error'; import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error'; @@ -85,7 +85,7 @@ export class State { } else { const lmService = dependencies.lockManager; return lmService - .withLock('streams_api', async () => { + .withLock('streams/apply_changes', async () => { try { await desiredState.commitChanges(startingState); return { status: 'success' as const, changes: desiredState.changes(startingState) }; @@ -101,7 +101,7 @@ export class State { } }) .catch((error) => { - if (error instanceof LockAcquisitionError) { + if (isLockAcquisitionError(error)) { throw new ConcurrentAccessError('Could not acquire lock for applying changes'); } throw error;