Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import {
QueryDslQueryContainer,
Result,
} from '@elastic/elasticsearch/lib/api/types';
import type { IScopedClusterClient, KibanaRequest, Logger } from '@kbn/core/server';
import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server';
import { isNotFoundError } from '@kbn/es-errors';
import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import { AssetClient } from './assets/asset_client';
import { ASSET_ID, ASSET_TYPE } from './assets/fields';
import { QueryClient } from './assets/query/query_client';
Expand Down Expand Up @@ -58,6 +59,7 @@ function wrapEsCall<T>(p: Promise<T>): Promise<T> {
export class StreamsClient {
constructor(
private readonly dependencies: {
lockManager: LockManagerService;
scopedClusterClient: IScopedClusterClient;
assetClient: AssetClient;
queryClient: QueryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { StreamsPluginStartDependencies } from '../../types';
import { AssetClient } from './assets/asset_client';
import { QueryClient } from './assets/query/query_client';
Expand Down Expand Up @@ -66,6 +67,7 @@ export class StreamsService {
queryClient,
logger,
scopedClusterClient,
lockManager: new LockManagerService(this.coreSetup, logger),
storageClient: storageAdapter.getClient(),
request,
isServerless,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from '../../errors/status_error';

export class ConcurrentAccessError extends StatusError {
constructor(message: string) {
super(message, 409);
this.name = 'ConcurrentAccessError';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { StreamChange } from './types';
import { ElasticsearchAction } from './execution_plan/types';
import { ExecutionPlan } from './execution_plan/execution_plan';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';

describe('State', () => {
const searchMock = jest.fn();
Expand All @@ -29,6 +30,9 @@ describe('State', () => {
};
const stateDependenciesMock = {
storageClient: storageClientMock,
lockManager: {
withLock: (_, cb) => cb(),
} as LockManagerService,
isDev: true,
} as any;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { difference, intersection, isEqual } from 'lodash';
import { isLockAcquisitionError } from '@kbn/lock-manager';
import { StatusError } from '../errors/status_error';
import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error';
import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error';
Expand All @@ -20,6 +21,7 @@ import type {
} from './stream_active_record/stream_active_record';
import { streamFromDefinition } from './stream_active_record/stream_from_definition';
import type { StateDependencies, StreamChange } from './types';
import { ConcurrentAccessError } from './errors/concurrent_access_error';

interface Changes {
created: string[];
Expand Down Expand Up @@ -81,19 +83,29 @@ export class State {
elasticsearchActions,
};
} else {
try {
await desiredState.commitChanges(startingState);
return { status: 'success', changes: desiredState.changes(startingState) };
} catch (error) {
await desiredState.attemptRollback(startingState, error);
return {
status: 'failed_with_rollback',
error: new StatusError(
`Failed to apply changes but successfully rolled back to previous state: ${error.message}`,
error.statusCode ?? 500
),
};
}
const lmService = dependencies.lockManager;
return lmService
.withLock('streams/apply_changes', async () => {
try {
await desiredState.commitChanges(startingState);
return { status: 'success' as const, changes: desiredState.changes(startingState) };
} catch (error) {
await desiredState.attemptRollback(startingState, error);
return {
status: 'failed_with_rollback' as const,
error: new StatusError(
`Failed to apply changes but successfully rolled back to previous state: ${error.message}`,
error.statusCode ?? 500
),
};
}
})
.catch((error) => {
if (isLockAcquisitionError(error)) {
throw new ConcurrentAccessError('Could not acquire lock for applying changes');
}
throw error;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { AssetClient } from '../assets/asset_client';
import type { StreamsClient } from '../client';
import type { StreamsStorageClient } from '../service';
Expand All @@ -25,6 +26,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange;

export interface StateDependencies {
logger: Logger;
lockManager: LockManagerService;
streamsClient: StreamsClient;
storageClient: StreamsStorageClient;
scopedClusterClient: IScopedClusterClient;
Expand Down
3 changes: 2 additions & 1 deletion x-pack/platform/plugins/shared/streams/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"@kbn/i18n",
"@kbn/zod-helpers",
"@kbn/core-http-server-utils",
"@kbn/inference-common"
"@kbn/inference-common",
"@kbn/lock-manager"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { disableStreams, enableStreams, forkStream } from './helpers/requests';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';

export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;

describe('conflict handling', function () {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});

after(async () => {
await disableStreams(apiClient);
});

it('should not allow multiple requests manipulating streams state at once', async () => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fork should be slow enough that we won't get flaky results on this ? perhaps to be safe we could trigger an even longer request, like a upsert that creates 2-3 child from root

const stream1 = {
stream: {
name: 'logs.nginx',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme',
},
};
const stream2 = {
stream: {
name: 'logs.apache',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme2',
},
};
const responses = await Promise.allSettled([
forkStream(apiClient, 'logs', stream1),
forkStream(apiClient, 'logs', stream2),
]);
// Assert than one of the requests failed with a conflict error and the other succeeded
// It needs to check either way (success or failure) because the order of execution is not guaranteed
expect(responses).to.have.length(2);
const successResponse = responses.find(
(response) => response.status === 'fulfilled' && response.value.acknowledged
);
const conflictResponse = responses.find(
(response) =>
response.status === 'rejected' &&
String(response.reason).toLowerCase().includes('conflict')
);
expect(successResponse).to.not.be(undefined);
expect(conflictResponse).to.not.be(undefined);
});
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./content'));
loadTestFile(require.resolve('./migration_on_read'));
loadTestFile(require.resolve('./meta_data'));
loadTestFile(require.resolve('./conflicts'));
});
}