Skip to content

Commit 129cf4f

Browse files
authored
[Ingest] Support yaml variables in datasource (#64459)
1 parent 408ad6f commit 129cf4f

File tree

3 files changed

+125
-42
lines changed

3 files changed

+125
-42
lines changed

x-pack/plugins/ingest_manager/server/services/datasource.ts

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66
import { SavedObjectsClientContract } from 'src/core/server';
7-
import { safeLoad } from 'js-yaml';
87
import { AuthenticatedUser } from '../../../security/server';
98
import {
109
DeleteDatasourcesResponse,
@@ -239,20 +238,13 @@ async function _assignPackageStreamToStream(
239238
throw new Error(`Stream template not found for dataset ${dataset}`);
240239
}
241240

242-
// Populate template variables from input config and stream config
243-
const data: { [k: string]: string | string[] } = {};
244-
if (input.vars) {
245-
for (const key of Object.keys(input.vars)) {
246-
data[key] = input.vars[key].value;
247-
}
248-
}
249-
if (stream.vars) {
250-
for (const key of Object.keys(stream.vars)) {
251-
data[key] = stream.vars[key].value;
252-
}
253-
}
254-
const yaml = safeLoad(createStream(data, pkgStream.buffer.toString()));
241+
const yaml = createStream(
242+
// Populate template variables from input vars and stream vars
243+
Object.assign({}, input.vars, stream.vars),
244+
pkgStream.buffer.toString()
245+
);
255246
stream.agent_stream = yaml;
247+
256248
return { ...stream };
257249
}
258250

x-pack/plugins/ingest_manager/server/services/epm/agent/agent.test.ts

Lines changed: 55 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,61 @@
66

77
import { createStream } from './agent';
88

9-
test('Test creating a stream from template', () => {
10-
const streamTemplate = `
11-
input: log
12-
paths:
13-
{{#each paths}}
14-
- {{this}}
15-
{{/each}}
16-
exclude_files: [".gz$"]
17-
processors:
18-
- add_locale: ~
19-
`;
20-
const vars = {
21-
paths: ['/usr/local/var/log/nginx/access.log'],
22-
};
9+
describe('createStream', () => {
10+
it('should work', () => {
11+
const streamTemplate = `
12+
input: log
13+
paths:
14+
{{#each paths}}
15+
- {{this}}
16+
{{/each}}
17+
exclude_files: [".gz$"]
18+
processors:
19+
- add_locale: ~
20+
`;
21+
const vars = {
22+
paths: { value: ['/usr/local/var/log/nginx/access.log'] },
23+
};
2324

24-
const output = createStream(vars, streamTemplate);
25+
const output = createStream(vars, streamTemplate);
26+
expect(output).toEqual({
27+
input: 'log',
28+
paths: ['/usr/local/var/log/nginx/access.log'],
29+
exclude_files: ['.gz$'],
30+
processors: [{ add_locale: null }],
31+
});
32+
});
2533

26-
expect(output).toBe(`
27-
input: log
28-
paths:
29-
- /usr/local/var/log/nginx/access.log
30-
exclude_files: [".gz$"]
31-
processors:
32-
- add_locale: ~
33-
`);
34+
it('should support yaml values', () => {
35+
const streamTemplate = `
36+
input: redis/metrics
37+
metricsets: ["key"]
38+
test: null
39+
{{#if key.patterns}}
40+
key.patterns: {{key.patterns}}
41+
{{/if}}
42+
`;
43+
const vars = {
44+
'key.patterns': {
45+
type: 'yaml',
46+
value: `
47+
- limit: 20
48+
pattern: '*'
49+
`,
50+
},
51+
};
52+
53+
const output = createStream(vars, streamTemplate);
54+
expect(output).toEqual({
55+
input: 'redis/metrics',
56+
metricsets: ['key'],
57+
test: null,
58+
'key.patterns': [
59+
{
60+
limit: 20,
61+
pattern: '*',
62+
},
63+
],
64+
});
65+
});
3466
});

x-pack/plugins/ingest_manager/server/services/epm/agent/agent.ts

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,71 @@
55
*/
66

77
import Handlebars from 'handlebars';
8+
import { safeLoad } from 'js-yaml';
9+
import { DatasourceConfigRecord } from '../../../../common';
810

9-
interface StreamVars {
10-
[k: string]: string | string[];
11+
function isValidKey(key: string) {
12+
return key !== '__proto__' && key !== 'constructor' && key !== 'prototype';
1113
}
1214

13-
export function createStream(vars: StreamVars, streamTemplate: string) {
14-
const template = Handlebars.compile(streamTemplate);
15-
return template(vars);
15+
function replaceVariablesInYaml(yamlVariables: { [k: string]: any }, yaml: any) {
16+
if (Object.keys(yamlVariables).length === 0 || !yaml) {
17+
return yaml;
18+
}
19+
20+
Object.entries(yaml).forEach(([key, value]: [string, any]) => {
21+
if (typeof value === 'object') {
22+
yaml[key] = replaceVariablesInYaml(yamlVariables, value);
23+
}
24+
if (typeof value === 'string' && value in yamlVariables) {
25+
yaml[key] = yamlVariables[value];
26+
}
27+
});
28+
29+
return yaml;
30+
}
31+
32+
function buildTemplateVariables(variables: DatasourceConfigRecord) {
33+
const yamlValues: { [k: string]: any } = {};
34+
const vars = Object.entries(variables).reduce((acc, [key, recordEntry]) => {
35+
// support variables with . like key.patterns
36+
const keyParts = key.split('.');
37+
const lastKeyPart = keyParts.pop();
38+
39+
if (!lastKeyPart || !isValidKey(lastKeyPart)) {
40+
throw new Error('Invalid key');
41+
}
42+
43+
let varPart = acc;
44+
for (const keyPart of keyParts) {
45+
if (!isValidKey(keyPart)) {
46+
throw new Error('Invalid key');
47+
}
48+
if (!varPart[keyPart]) {
49+
varPart[keyPart] = {};
50+
}
51+
varPart = varPart[keyPart];
52+
}
53+
54+
if (recordEntry.type && recordEntry.type === 'yaml') {
55+
const yamlKeyPlaceholder = `##${key}##`;
56+
varPart[lastKeyPart] = `"${yamlKeyPlaceholder}"`;
57+
yamlValues[yamlKeyPlaceholder] = recordEntry.value ? safeLoad(recordEntry.value) : null;
58+
} else {
59+
varPart[lastKeyPart] = recordEntry.value;
60+
}
61+
return acc;
62+
}, {} as { [k: string]: any });
63+
64+
return { vars, yamlValues };
65+
}
66+
67+
export function createStream(variables: DatasourceConfigRecord, streamTemplate: string) {
68+
const { vars, yamlValues } = buildTemplateVariables(variables);
69+
70+
const template = Handlebars.compile(streamTemplate, { noEscape: true });
71+
const stream = template(vars);
72+
const yamlFromStream = safeLoad(stream, {});
73+
74+
return replaceVariablesInYaml(yamlValues, yamlFromStream);
1675
}

0 commit comments

Comments
 (0)