From 5171bdbd212411c7b2c35c2cf28caca83dd69758 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 16 Jun 2025 17:52:26 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=8A=20Streams:=20Prevent=20concurrent?= =?UTF-8?q?=20access=20(#222961)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR guards changes to the streams state that go through `State.attemptChanges` via the newly introduced lock manager. If two requests are happening at the same time, one of them now fails with a 409. ## Concerns * Lock expiry is 30s for now - is this too little? Should be good enough for now, maybe we need to reconsider once we introduce the bulk api * This is only guarding changes that go through the `State` class - some things like queries and dashboards do not, so they can still be subject to race conditions. We could sprinkle more locks over the code base, but I would like to solve this by moving them into `State` as well, that seems like the cleaner approach, even though a bit more effort * Biggest question - on this PR the concurrent request fails directly with a 409. Is this OK or should it wait and retry a couple times? I'm in favor of starting like this and seeing if this is actually a problem. --------- Co-authored-by: kibanamachine <42973632+kibanamachine@users.noreply.github.com> Co-authored-by: Kevin Lacabane (cherry picked from commit a8b2ac6c48fd65bbdccce4ac3608f59b7590da9c) --- .../streams/server/lib/streams/client.ts | 4 +- .../streams/server/lib/streams/service.ts | 2 + .../errors/concurrent_access_error.ts | 15 ++++ .../streams/state_management/state.test.ts | 4 ++ .../lib/streams/state_management/state.ts | 38 ++++++---- .../lib/streams/state_management/types.ts | 2 + .../plugins/shared/streams/tsconfig.json | 3 +- .../apis/observability/streams/conflicts.ts | 70 +++++++++++++++++++ .../apis/observability/streams/index.ts | 1 + 9 files changed, 124 insertions(+), 15 deletions(-) create mode 100644 x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts create mode 100644 x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts index 9544fd69c6e97..0dc9eb607f1ed 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts @@ -11,9 +11,10 @@ import { QueryDslQueryContainer, Result, } from '@elastic/elasticsearch/lib/api/types'; -import type { IScopedClusterClient, KibanaRequest, Logger } from '@kbn/core/server'; +import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server'; import { isNotFoundError } from '@kbn/es-errors'; import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import { AssetClient } from './assets/asset_client'; import { ASSET_ID, ASSET_TYPE } from './assets/fields'; import { QueryClient } from './assets/query/query_client'; @@ -58,6 +59,7 @@ function wrapEsCall(p: Promise): Promise { export class StreamsClient { constructor( private readonly dependencies: { + lockManager: LockManagerService; scopedClusterClient: IScopedClusterClient; assetClient: AssetClient; queryClient: QueryClient; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts index 4ee127b5d9e96..9e5ff87e483b4 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts @@ -8,6 +8,7 @@ import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server'; import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import type { StreamsPluginStartDependencies } from '../../types'; import { AssetClient } from './assets/asset_client'; import { QueryClient } from './assets/query/query_client'; @@ -66,6 +67,7 @@ export class StreamsService { queryClient, logger, scopedClusterClient, + lockManager: new LockManagerService(this.coreSetup, logger), storageClient: storageAdapter.getClient(), request, isServerless, diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts new file mode 100644 index 0000000000000..c77455b13bfab --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/errors/concurrent_access_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from '../../errors/status_error'; + +export class ConcurrentAccessError extends StatusError { + constructor(message: string) { + super(message, 409); + this.name = 'ConcurrentAccessError'; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts index db58a15400271..36dfe87e083c0 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.test.ts @@ -21,6 +21,7 @@ import { StreamChange } from './types'; import { ElasticsearchAction } from './execution_plan/types'; import { ExecutionPlan } from './execution_plan/execution_plan'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; describe('State', () => { const searchMock = jest.fn(); @@ -29,6 +30,9 @@ describe('State', () => { }; const stateDependenciesMock = { storageClient: storageClientMock, + lockManager: { + withLock: (_, cb) => cb(), + } as LockManagerService, isDev: true, } as any; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts index e613cc5caa132..34b663beb125a 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/state.ts @@ -6,6 +6,7 @@ */ import { difference, intersection, isEqual } from 'lodash'; +import { isLockAcquisitionError } from '@kbn/lock-manager'; import { StatusError } from '../errors/status_error'; import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error'; import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error'; @@ -20,6 +21,7 @@ import type { } from './stream_active_record/stream_active_record'; import { streamFromDefinition } from './stream_active_record/stream_from_definition'; import type { StateDependencies, StreamChange } from './types'; +import { ConcurrentAccessError } from './errors/concurrent_access_error'; interface Changes { created: string[]; @@ -81,19 +83,29 @@ export class State { elasticsearchActions, }; } else { - try { - await desiredState.commitChanges(startingState); - return { status: 'success', changes: desiredState.changes(startingState) }; - } catch (error) { - await desiredState.attemptRollback(startingState, error); - return { - status: 'failed_with_rollback', - error: new StatusError( - `Failed to apply changes but successfully rolled back to previous state: ${error.message}`, - error.statusCode ?? 500 - ), - }; - } + const lmService = dependencies.lockManager; + return lmService + .withLock('streams/apply_changes', async () => { + try { + await desiredState.commitChanges(startingState); + return { status: 'success' as const, changes: desiredState.changes(startingState) }; + } catch (error) { + await desiredState.attemptRollback(startingState, error); + return { + status: 'failed_with_rollback' as const, + error: new StatusError( + `Failed to apply changes but successfully rolled back to previous state: ${error.message}`, + error.statusCode ?? 500 + ), + }; + } + }) + .catch((error) => { + if (isLockAcquisitionError(error)) { + throw new ConcurrentAccessError('Could not acquire lock for applying changes'); + } + throw error; + }); } } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts index 0eec791722235..6af295e71d5ab 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/state_management/types.ts @@ -7,6 +7,7 @@ import type { IScopedClusterClient, Logger } from '@kbn/core/server'; import { Streams } from '@kbn/streams-schema'; +import { LockManagerService } from '@kbn/lock-manager'; import type { AssetClient } from '../assets/asset_client'; import type { StreamsClient } from '../client'; import type { StreamsStorageClient } from '../service'; @@ -25,6 +26,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange; export interface StateDependencies { logger: Logger; + lockManager: LockManagerService; streamsClient: StreamsClient; storageClient: StreamsStorageClient; scopedClusterClient: IScopedClusterClient; diff --git a/x-pack/platform/plugins/shared/streams/tsconfig.json b/x-pack/platform/plugins/shared/streams/tsconfig.json index a1122b1fb8b10..95768c441dbdc 100644 --- a/x-pack/platform/plugins/shared/streams/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams/tsconfig.json @@ -50,6 +50,7 @@ "@kbn/i18n", "@kbn/zod-helpers", "@kbn/core-http-server-utils", - "@kbn/inference-common" + "@kbn/inference-common", + "@kbn/lock-manager" ] } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts new file mode 100644 index 0000000000000..cf4f495e73c29 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/conflicts.ts @@ -0,0 +1,70 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { disableStreams, enableStreams, forkStream } from './helpers/requests'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + let apiClient: StreamsSupertestRepositoryClient; + + describe('conflict handling', function () { + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + await enableStreams(apiClient); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + it('should not allow multiple requests manipulating streams state at once', async () => { + const stream1 = { + stream: { + name: 'logs.nginx', + }, + if: { + field: 'resource.attributes.host.name', + operator: 'eq' as const, + value: 'routeme', + }, + }; + const stream2 = { + stream: { + name: 'logs.apache', + }, + if: { + field: 'resource.attributes.host.name', + operator: 'eq' as const, + value: 'routeme2', + }, + }; + const responses = await Promise.allSettled([ + forkStream(apiClient, 'logs', stream1), + forkStream(apiClient, 'logs', stream2), + ]); + // Assert than one of the requests failed with a conflict error and the other succeeded + // It needs to check either way (success or failure) because the order of execution is not guaranteed + expect(responses).to.have.length(2); + const successResponse = responses.find( + (response) => response.status === 'fulfilled' && response.value.acknowledged + ); + const conflictResponse = responses.find( + (response) => + response.status === 'rejected' && + String(response.reason).toLowerCase().includes('conflict') + ); + expect(successResponse).to.not.be(undefined); + expect(conflictResponse).to.not.be(undefined); + }); + }); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts index e0069d24052cf..e8e0621f0b1ed 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts @@ -26,5 +26,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) loadTestFile(require.resolve('./content')); loadTestFile(require.resolve('./migration_on_read')); loadTestFile(require.resolve('./meta_data')); + loadTestFile(require.resolve('./conflicts')); }); }