Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ export class QueryClient {
await this.bulk(stream, [{ delete: { id: queryId } }]);
}

public async deleteAll(stream: string) {
if (!this.isSignificantEventsEnabled) {
this.dependencies.logger.debug(
`Skipping deleteAll for stream "${stream}" because significant events feature is disabled.`
);
return;
}

const currentQueryLinks = await this.dependencies.assetClient.getAssetLinks(stream, ['query']);
const queriesToDelete = currentQueryLinks.map((link) => ({ delete: { id: link.query.id } }));
await this.bulk(stream, queriesToDelete);
}

public async bulk(
stream: string,
operations: Array<{ index?: StreamQuery; delete?: { id: string } }>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,6 @@ export class StreamsClient {
}
);

await this.dependencies.queryClient.syncQueries(name, []);

return { acknowledged: true, result: 'deleted' };
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import type {
DeleteDotStreamsDocumentAction,
DeleteIndexTemplateAction,
DeleteIngestPipelineAction,
DeleteQueriesAction,
ElasticsearchAction,
UpdateLifecycleAction,
UpsertComponentTemplateAction,
Expand Down Expand Up @@ -74,6 +75,7 @@ export class ExecutionPlan {
delete_datastream: [],
upsert_dot_streams_document: [],
delete_dot_streams_document: [],
delete_queries: [],
};
}

Expand Down Expand Up @@ -158,6 +160,7 @@ export class ExecutionPlan {
delete_datastream,
upsert_dot_streams_document,
delete_dot_streams_document,
delete_queries,
...rest
} = this.actionsByType;
assertEmptyObject(rest);
Expand Down Expand Up @@ -192,6 +195,7 @@ export class ExecutionPlan {
await Promise.all([
this.deleteComponentTemplates(delete_component_template),
this.deleteIngestPipelines(delete_ingest_pipeline),
this.deleteQueries(delete_queries),
]);

await this.upsertAndDeleteDotStreamsDocuments([
Expand All @@ -205,6 +209,16 @@ export class ExecutionPlan {
}
}

private async deleteQueries(actions: DeleteQueriesAction[]) {
if (actions.length === 0) {
return;
}

return Promise.all(
actions.map((action) => this.dependencies.queryClient.deleteAll(action.request.name))
);
}

private async upsertComponentTemplates(actions: UpsertComponentTemplateAction[]) {
return Promise.all(
actions.map((action) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ export function getRequiredPermissionsForActions({
// since they are done by the kibana system user
upsert_dot_streams_document,
delete_dot_streams_document,
delete_queries,
...rest
} = actionsByType;
assertEmptyObject(rest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,5 +1058,6 @@ function emptyActionsByType(): ActionsByType {
delete_datastream: [],
upsert_dot_streams_document: [],
delete_dot_streams_document: [],
delete_queries: [],
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,13 @@ export interface DeleteDotStreamsDocumentAction {
};
}

export interface DeleteQueriesAction {
type: 'delete_queries';
request: {
name: string;
};
}

export type ElasticsearchAction =
| UpsertComponentTemplateAction
| DeleteComponentTemplateAction
Expand All @@ -122,7 +129,8 @@ export type ElasticsearchAction =
| UpdateLifecycleAction
| DeleteDatastreamAction
| UpsertDotStreamsDocumentAction
| DeleteDotStreamsDocumentAction;
| DeleteDotStreamsDocumentAction
| DeleteQueriesAction;

export interface ActionsByType {
upsert_component_template: UpsertComponentTemplateAction[];
Expand All @@ -139,4 +147,5 @@ export interface ActionsByType {
delete_datastream: DeleteDatastreamAction[];
upsert_dot_streams_document: UpsertDotStreamsDocumentAction[];
delete_dot_streams_document: DeleteDotStreamsDocumentAction[];
delete_queries: DeleteQueriesAction[];
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ export class ClassicStream extends StreamActiveRecord<Streams.ClassicStream.Defi
name: this._definition.name,
},
},
{
type: 'delete_queries',
request: {
name: this._definition.name,
},
},
];

if (this._definition.ingest.processing.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,12 @@ export class WiredStream extends StreamActiveRecord<Streams.WiredStream.Definiti
name: this._definition.name,
},
},
{
type: 'delete_queries',
request: {
name: this._definition.name,
},
},
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { LockManagerService } from '@kbn/lock-manager';
import type { AssetClient } from '../assets/asset_client';
import type { StreamsClient } from '../client';
import type { StreamsStorageClient } from '../service';
import { QueryClient } from '../assets/query/query_client';

interface StreamUpsertChange {
type: 'upsert';
Expand All @@ -31,6 +32,7 @@ export interface StateDependencies {
storageClient: StreamsStorageClient;
scopedClusterClient: IScopedClusterClient;
assetClient: AssetClient;
queryClient: QueryClient;
isServerless: boolean;
isDev: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,23 @@ export async function getStream(
.then((response) => response.body);
}

export async function deleteStream(
apiClient: StreamsSupertestRepositoryClient,
name: string,
expectStatusCode: number = 200
) {
return await apiClient
.fetch('DELETE /api/streams/{name} 2023-10-31', {
params: {
path: {
name,
},
},
})
.expect(expectStatusCode)
.then((response) => response.body);
}

export async function getIlmStats(
apiClient: StreamsSupertestRepositoryClient,
name: string,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ import {
StreamsSupertestRepositoryClient,
createStreamsRepositoryAdminClient,
} from './helpers/repository_client';
import { disableStreams, enableStreams, getStream, putStream } from './helpers/requests';
import {
deleteStream,
disableStreams,
enableStreams,
getStream,
putStream,
} from './helpers/requests';
import { RoleCredentials } from '../../services';

export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
const roleScopedSupertest = getService('roleScopedSupertest');
const esClient = getService('es');
const kibanaServer = getService('kibanaServer');
const alertingApi = getService('alertingApiCommon');
const samlAuth = getService('samlAuth');
let roleAuthc: RoleCredentials;
let apiClient: StreamsSupertestRepositoryClient;

describe('Significant Events', function () {
before(async () => {
roleAuthc = await samlAuth.createM2mApiKeyWithRoleScope('admin');
apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest);
await enableStreams(apiClient);
await kibanaServer.uiSettings.update({
Expand All @@ -43,30 +54,138 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) {
});

describe('Wired streams update', () => {
it('updates the queries', async () => {
let streamDefinition = await getStream(apiClient, 'logs');
expect(streamDefinition.queries.length).to.eql(0);

const response = await putStream(apiClient, 'logs', {
stream: {
description: '',
ingest: {
...(streamDefinition as Streams.WiredStream.GetResponse).stream.ingest,
},
const STREAM_NAME = 'logs.queries-test';
const stream: Streams.WiredStream.UpsertRequest['stream'] = {
description: '',
ingest: {
lifecycle: { inherit: {} },
processing: [],
wired: {
routing: [],
fields: {},
},
},
};

beforeEach(async () => {
await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [],
}).then((response) => expect(response).to.have.property('acknowledged', true));
await alertingApi.deleteRules({ roleAuthc });
});

it('updates the queries', async () => {
const response = await putStream(apiClient, STREAM_NAME, {
stream,
dashboards: [],
queries: [{ id: 'aaa', title: 'OOM Error', kql: { query: "message: 'OOM Error'" } }],
});
expect(response).to.have.property('acknowledged', true);

streamDefinition = await getStream(apiClient, 'logs');
const streamDefinition = await getStream(apiClient, STREAM_NAME);
expect(streamDefinition.queries.length).to.eql(1);
expect(streamDefinition.queries[0]).to.eql({
id: 'aaa',
title: 'OOM Error',
kql: { query: "message: 'OOM Error'" },
});
});

it('deletes all queries on stream and its children', async () => {
let response = await putStream(apiClient, STREAM_NAME, {
stream: {
...stream,
ingest: {
...stream.ingest,
wired: {
...stream.ingest.wired,
routing: [
{
destination: 'logs.queries-test.child',
if: {
always: {},
},
},
],
},
},
},
dashboards: [],
queries: [
{
id: 'logs.queries-test.query1',
title: 'should not be deleted',
kql: { query: 'message:"irrelevant"' },
},
],
});
expect(response).to.have.property('acknowledged', true);

response = await putStream(apiClient, 'logs.queries-test.child', {
stream: {
...stream,
ingest: {
...stream.ingest,
wired: {
...stream.ingest.wired,
routing: [
{
destination: 'logs.queries-test.child.first',
if: {
field: 'attributes.field',
operator: 'lt',
value: 15,
},
},
{
destination: 'logs.queries-test.child.second',
if: {
field: 'attributes.field',
operator: 'gt',
value: 15,
},
},
],
},
},
},
dashboards: [],
queries: [
{
id: 'logs.queries-test.child.query1',
title: 'must be deleted',
kql: { query: 'message:"irrelevant"' },
},
],
});
expect(response).to.have.property('acknowledged', true);

response = await putStream(apiClient, 'logs.queries-test.child.first', {
stream,
dashboards: [],
queries: [
{
id: 'logs.queries-test.child.first.query1',
title: 'must be deleted',
kql: { query: 'message:"irrelevant"' },
},
{
id: 'logs.queries-test.child.first.query2',
title: 'must be deleted',
kql: { query: 'message:"irrelevant"' },
},
],
});
expect(response).to.have.property('acknowledged', true);

await deleteStream(apiClient, 'logs.queries-test.child');

const rules = await alertingApi.searchRules(roleAuthc, '');
expect(rules.body.data).to.have.length(1);
expect(rules.body.data[0].name).to.eql('should not be deleted');
});
});

describe('Classic streams update', () => {
Expand Down