Skip to content

Commit c337198

Browse files
committed
Update logstash pipeline management to use system index APIs
This change updates the logstash pipeline management plugin to use pipeline management APIs in Elasticsearch rather than directly accessing the .logstash index. In Elasticsearch 8.0, direct access to system indices will no longer be allowed when using standard APIs. Given this change, a new set of APIs has been created specifically for the management of Logstash pipelines and this change makes use of the APIs. Relates elastic/elasticsearch#53350
1 parent 79a2b64 commit c337198

File tree

13 files changed

+68
-196
lines changed

13 files changed

+68
-196
lines changed

x-pack/plugins/logstash/server/lib/fetch_all_from_scroll/fetch_all_from_scroll.test.ts

Lines changed: 0 additions & 71 deletions
This file was deleted.

x-pack/plugins/logstash/server/lib/fetch_all_from_scroll/fetch_all_from_scroll.ts

Lines changed: 0 additions & 34 deletions
This file was deleted.

x-pack/plugins/logstash/server/lib/fetch_all_from_scroll/index.ts

Lines changed: 0 additions & 7 deletions
This file was deleted.

x-pack/plugins/logstash/server/models/pipeline/pipeline.test.ts

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@ describe('pipeline', () => {
1010
describe('Pipeline', () => {
1111
describe('fromUpstreamJSON factory method', () => {
1212
const upstreamJSON = {
13-
_id: 'apache',
14-
_source: {
13+
apache: {
1514
description: 'this is an apache pipeline',
1615
pipeline_metadata: {
1716
version: 1,
@@ -21,25 +20,23 @@ describe('pipeline', () => {
2120
pipeline: 'input {} filter { grok {} }\n output {}',
2221
},
2322
};
23+
const upstreamId = 'apache';
2424

2525
it('returns correct Pipeline instance', () => {
2626
const pipeline = Pipeline.fromUpstreamJSON(upstreamJSON);
27-
expect(pipeline.id).toBe(upstreamJSON._id);
28-
expect(pipeline.description).toBe(upstreamJSON._source.description);
29-
expect(pipeline.username).toBe(upstreamJSON._source.username);
30-
expect(pipeline.pipeline).toBe(upstreamJSON._source.pipeline);
27+
expect(pipeline.id).toBe(upstreamId);
28+
expect(pipeline.description).toBe(upstreamJSON.apache.description);
29+
expect(pipeline.username).toBe(upstreamJSON.apache.username);
30+
expect(pipeline.pipeline).toBe(upstreamJSON.apache.pipeline);
3131
});
3232

33-
it('throws if pipeline argument does not contain an id property', () => {
34-
const badJSON = {
35-
// no _id
36-
_source: upstreamJSON._source,
37-
};
33+
it('throws if pipeline argument does not contain id as a key', () => {
34+
const badJSON = {};
3835
const testFromUpstreamJsonError = () => {
3936
return Pipeline.fromUpstreamJSON(badJSON);
4037
};
4138
expect(testFromUpstreamJsonError).toThrowError(
42-
/upstreamPipeline argument must contain an id property/i
39+
/upstreamPipeline argument must contain pipeline id as a key/i
4340
);
4441
});
4542
});

x-pack/plugins/logstash/server/models/pipeline/pipeline.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,21 @@ export class Pipeline {
9393

9494
// generate Pipeline object from elasticsearch response
9595
static fromUpstreamJSON(upstreamPipeline: Record<string, any>) {
96-
if (!upstreamPipeline._id) {
96+
if (Object.keys(upstreamPipeline).length !== 1) {
9797
throw badRequest(
9898
i18n.translate(
9999
'xpack.logstash.upstreamPipelineArgumentMustContainAnIdPropertyErrorMessage',
100100
{
101-
defaultMessage: 'upstreamPipeline argument must contain an id property',
101+
defaultMessage: 'upstreamPipeline argument must contain pipeline id as a key',
102102
}
103103
)
104104
);
105105
}
106-
const id = get(upstreamPipeline, '_id') as string;
107-
const description = get(upstreamPipeline, '_source.description') as string;
108-
const username = get(upstreamPipeline, '_source.username') as string;
109-
const pipeline = get(upstreamPipeline, '_source.pipeline') as string;
110-
const settings = get(upstreamPipeline, '_source.pipeline_settings') as Record<string, any>;
106+
const id = Object.keys(upstreamPipeline).pop() as string;
107+
const description = get(upstreamPipeline, id + '.description') as string;
108+
const username = get(upstreamPipeline, id + '.username') as string;
109+
const pipeline = get(upstreamPipeline, id + '.pipeline') as string;
110+
const settings = get(upstreamPipeline, id + '.pipeline_settings') as Record<string, any>;
111111

112112
const opts: PipelineOptions = { id, description, username, pipeline, settings };
113113

x-pack/plugins/logstash/server/models/pipeline_list_item/pipeline_list_item.test.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ import { PipelineListItem } from './pipeline_list_item';
99
describe('pipeline_list_item', () => {
1010
describe('PipelineListItem', () => {
1111
const upstreamJSON = {
12-
_id: 'apache',
13-
_source: {
12+
apache: {
1413
description: 'this is an apache pipeline',
1514
last_modified: '2017-05-14T02:50:51.250Z',
1615
pipeline_metadata: {
@@ -20,24 +19,22 @@ describe('pipeline_list_item', () => {
2019
username: 'elastic',
2120
pipeline: 'input {} filter { grok {} }\n output {}',
2221
},
23-
_index: 'index',
24-
_type: 'type',
25-
_score: 100,
2622
};
23+
const upstreamId = 'apache';
2724

2825
describe('fromUpstreamJSON factory method', () => {
2926
it('returns correct PipelineListItem instance', () => {
30-
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
31-
expect(pipelineListItem.id).toBe(upstreamJSON._id);
32-
expect(pipelineListItem.description).toBe(upstreamJSON._source.description);
33-
expect(pipelineListItem.username).toBe(upstreamJSON._source.username);
34-
expect(pipelineListItem.last_modified).toBe(upstreamJSON._source.last_modified);
27+
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
28+
expect(pipelineListItem.id).toBe(upstreamId);
29+
expect(pipelineListItem.description).toBe(upstreamJSON.apache.description);
30+
expect(pipelineListItem.username).toBe(upstreamJSON.apache.username);
31+
expect(pipelineListItem.last_modified).toBe(upstreamJSON.apache.last_modified);
3532
});
3633
});
3734

3835
describe('downstreamJSON getter method', () => {
3936
it('returns the downstreamJSON JSON', () => {
40-
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamJSON);
37+
const pipelineListItem = PipelineListItem.fromUpstreamJSON(upstreamId, upstreamJSON);
4138
const expectedDownstreamJSON = {
4239
id: 'apache',
4340
description: 'this is an apache pipeline',

x-pack/plugins/logstash/server/models/pipeline_list_item/pipeline_list_item.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import { get } from 'lodash';
8-
import { Hit, PipelineListItemOptions } from '../../types';
8+
import { PipelineListItemOptions } from '../../types';
99

1010
export class PipelineListItem {
1111
public readonly id: string;
@@ -34,12 +34,12 @@ export class PipelineListItem {
3434
* Takes the json GET response from ES and constructs a pipeline model to be used
3535
* in Kibana downstream
3636
*/
37-
static fromUpstreamJSON(pipeline: Hit) {
37+
static fromUpstreamJSON(id: string, pipeline: Record<string, any>) {
3838
const opts = {
39-
id: pipeline._id,
40-
description: get(pipeline, '_source.description') as string,
41-
last_modified: get(pipeline, '_source.last_modified') as string,
42-
username: get(pipeline, '_source.username') as string,
39+
id,
40+
description: get(pipeline, id + '.description') as string,
41+
last_modified: get(pipeline, id + '.last_modified') as string,
42+
username: get(pipeline, id + '.username') as string,
4343
};
4444

4545
return new PipelineListItem(opts);

x-pack/plugins/logstash/server/routes/pipeline/delete.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66
import { schema } from '@kbn/config-schema';
77
import { IRouter } from 'src/core/server';
8-
import { INDEX_NAMES } from '../../../common/constants';
98
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
109

1110
import { checkLicense } from '../../lib/check_license';
@@ -25,10 +24,9 @@ export function registerPipelineDeleteRoute(router: IRouter) {
2524
router.handleLegacyErrors(async (context, request, response) => {
2625
const client = context.logstash!.esClient;
2726

28-
await client.callAsCurrentUser('delete', {
29-
index: INDEX_NAMES.PIPELINES,
30-
id: request.params.id,
31-
refresh: 'wait_for',
27+
await client.callAsCurrentUser('transport.request', {
28+
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
29+
method: 'DELETE',
3230
});
3331

3432
return response.noContent();

x-pack/plugins/logstash/server/routes/pipeline/load.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import { schema } from '@kbn/config-schema';
77
import { IRouter } from 'src/core/server';
88

9-
import { INDEX_NAMES } from '../../../common/constants';
109
import { Pipeline } from '../../models/pipeline';
1110
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
1211
import { checkLicense } from '../../lib/check_license';
@@ -26,20 +25,21 @@ export function registerPipelineLoadRoute(router: IRouter) {
2625
router.handleLegacyErrors(async (context, request, response) => {
2726
const client = context.logstash!.esClient;
2827

29-
const result = await client.callAsCurrentUser('get', {
30-
index: INDEX_NAMES.PIPELINES,
31-
id: request.params.id,
32-
_source: ['description', 'username', 'pipeline', 'pipeline_settings'],
33-
ignore: [404],
34-
});
28+
try {
29+
const result = await client.callAsCurrentUser('transport.request', {
30+
path: '/_logstash/pipeline/' + encodeURIComponent(request.params.id),
31+
method: 'GET',
32+
});
33+
return response.ok({
34+
body: Pipeline.fromUpstreamJSON(result).downstreamJSON,
35+
});
36+
} catch (err) {
37+
if (err.statusCode === 404) {
38+
return response.notFound();
39+
}
3540

36-
if (!result.found) {
37-
return response.notFound();
41+
throw err;
3842
}
39-
40-
return response.ok({
41-
body: Pipeline.fromUpstreamJSON(result).downstreamJSON,
42-
});
4343
})
4444
)
4545
);

x-pack/plugins/logstash/server/routes/pipeline/save.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import { schema } from '@kbn/config-schema';
77
import { i18n } from '@kbn/i18n';
88
import { IRouter } from 'src/core/server';
99

10-
import { INDEX_NAMES } from '../../../common/constants';
1110
import { Pipeline } from '../../models/pipeline';
1211
import { wrapRouteWithLicenseCheck } from '../../../../licensing/server';
1312
import { SecurityPluginSetup } from '../../../../security/server';
@@ -41,11 +40,10 @@ export function registerPipelineSaveRoute(router: IRouter, security?: SecurityPl
4140
const client = context.logstash!.esClient;
4241
const pipeline = Pipeline.fromDownstreamJSON(request.body, request.params.id, username);
4342

44-
await client.callAsCurrentUser('index', {
45-
index: INDEX_NAMES.PIPELINES,
46-
id: pipeline.id,
43+
await client.callAsCurrentUser('transport.request', {
44+
path: '/_logstash/pipeline/' + encodeURIComponent(pipeline.id),
45+
method: 'PUT',
4746
body: pipeline.upstreamJSON,
48-
refresh: 'wait_for',
4947
});
5048

5149
return response.noContent();

0 commit comments

Comments
 (0)