Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import {
QueryDslQueryContainer,
Result,
} from '@elastic/elasticsearch/lib/api/types';
import type { IScopedClusterClient, KibanaRequest, Logger } from '@kbn/core/server';
import type { IScopedClusterClient, Logger, KibanaRequest } from '@kbn/core/server';
import { isNotFoundError } from '@kbn/es-errors';
import { Condition, Streams, getAncestors, getParentId } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import { AssetClient } from './assets/asset_client';
import { ASSET_ID, ASSET_TYPE } from './assets/fields';
import { QueryClient } from './assets/query/query_client';
Expand Down Expand Up @@ -58,6 +59,7 @@ function wrapEsCall<T>(p: Promise<T>): Promise<T> {
export class StreamsClient {
constructor(
private readonly dependencies: {
lockManager: LockManagerService;
scopedClusterClient: IScopedClusterClient;
assetClient: AssetClient;
queryClient: QueryClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import type { CoreSetup, KibanaRequest, Logger } from '@kbn/core/server';
import { IStorageClient, StorageIndexAdapter, StorageSettings, types } from '@kbn/storage-adapter';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { StreamsPluginStartDependencies } from '../../types';
import { AssetClient } from './assets/asset_client';
import { QueryClient } from './assets/query/query_client';
Expand Down Expand Up @@ -66,6 +67,7 @@ export class StreamsService {
queryClient,
logger,
scopedClusterClient,
lockManager: new LockManagerService(this.coreSetup, logger),
storageClient: storageAdapter.getClient(),
request,
isServerless,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { StatusError } from '../../errors/status_error';

export class ConcurrentAccessError extends StatusError {
constructor(message: string) {
super(message, 409);
this.name = 'ConcurrentAccessError';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { StreamChange } from './types';
import { ElasticsearchAction } from './execution_plan/types';
import { ExecutionPlan } from './execution_plan/execution_plan';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';

describe('State', () => {
const searchMock = jest.fn();
Expand All @@ -29,6 +30,9 @@ describe('State', () => {
};
const stateDependenciesMock = {
storageClient: storageClientMock,
lockManager: {
withLock: (_, cb) => cb(),
} as LockManagerService,
isDev: true,
} as any;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { difference, intersection, isEqual } from 'lodash';
import { isLockAcquisitionError } from '@kbn/lock-manager';
import { StatusError } from '../errors/status_error';
import { FailedToApplyRequestedChangesError } from './errors/failed_to_apply_requested_changes_error';
import { FailedToDetermineElasticsearchActionsError } from './errors/failed_to_determine_elasticsearch_actions_error';
Expand All @@ -20,6 +21,7 @@ import type {
} from './stream_active_record/stream_active_record';
import { streamFromDefinition } from './stream_active_record/stream_from_definition';
import type { StateDependencies, StreamChange } from './types';
import { ConcurrentAccessError } from './errors/concurrent_access_error';

interface Changes {
created: string[];
Expand Down Expand Up @@ -81,19 +83,29 @@ export class State {
elasticsearchActions,
};
} else {
try {
await desiredState.commitChanges(startingState);
return { status: 'success', changes: desiredState.changes(startingState) };
} catch (error) {
await desiredState.attemptRollback(startingState, error);
return {
status: 'failed_with_rollback',
error: new StatusError(
`Failed to apply changes but successfully rolled back to previous state: ${error.message}`,
error.statusCode ?? 500
),
};
}
const lmService = dependencies.lockManager;
return lmService
.withLock('streams/apply_changes', async () => {
try {
await desiredState.commitChanges(startingState);
return { status: 'success' as const, changes: desiredState.changes(startingState) };
} catch (error) {
await desiredState.attemptRollback(startingState, error);
return {
status: 'failed_with_rollback' as const,
error: new StatusError(
`Failed to apply changes but successfully rolled back to previous state: ${error.message}`,
error.statusCode ?? 500
),
};
}
})
.catch((error) => {
if (isLockAcquisitionError(error)) {
throw new ConcurrentAccessError('Could not acquire lock for applying changes');
}
throw error;
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import type { IScopedClusterClient, Logger } from '@kbn/core/server';
import { Streams } from '@kbn/streams-schema';
import { LockManagerService } from '@kbn/lock-manager';
import type { AssetClient } from '../assets/asset_client';
import type { StreamsClient } from '../client';
import type { StreamsStorageClient } from '../service';
Expand All @@ -25,6 +26,7 @@ export type StreamChange = StreamUpsertChange | StreamDeleteChange;

export interface StateDependencies {
logger: Logger;
lockManager: LockManagerService;
streamsClient: StreamsClient;
storageClient: StreamsStorageClient;
scopedClusterClient: IScopedClusterClient;
Expand Down
3 changes: 2 additions & 1 deletion x-pack/platform/plugins/shared/streams/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
"@kbn/i18n",
"@kbn/zod-helpers",
"@kbn/core-http-server-utils",
"@kbn/inference-common"
"@kbn/inference-common",
"@kbn/lock-manager"
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import expect from '@kbn/expect';
import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context';
import { disableStreams, enableStreams, forkStream } from './helpers/requests';
import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';

export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
let apiClient: StreamsSupertestRepositoryClient;

describe('conflict handling', function () {
before(async () => {
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
});

after(async () => {
await disableStreams(apiClient);
});

it('should not allow multiple requests manipulating streams state at once', async () => {
const stream1 = {
stream: {
name: 'logs.nginx',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme',
},
};
const stream2 = {
stream: {
name: 'logs.apache',
},
if: {
field: 'resource.attributes.host.name',
operator: 'eq' as const,
value: 'routeme2',
},
};
const responses = await Promise.allSettled([
forkStream(apiClient, 'logs', stream1),
forkStream(apiClient, 'logs', stream2),
]);
// Assert than one of the requests failed with a conflict error and the other succeeded
// It needs to check either way (success or failure) because the order of execution is not guaranteed
expect(responses).to.have.length(2);
const successResponse = responses.find(
(response) => response.status === 'fulfilled' && response.value.acknowledged
);
const conflictResponse = responses.find(
(response) =>
response.status === 'rejected' &&
String(response.reason).toLowerCase().includes('conflict')
);
expect(successResponse).to.not.be(undefined);
expect(conflictResponse).to.not.be(undefined);
});
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext)
loadTestFile(require.resolve('./content'));
loadTestFile(require.resolve('./migration_on_read'));
loadTestFile(require.resolve('./meta_data'));
loadTestFile(require.resolve('./conflicts'));
});
}