Skip to content

Commit c441be7

Browse files
[Fleet] Update data streams mappings directly instead of against backing indices (#89660) (#89892)
* Update data streams mappings directly instead of querying for backing indices, update integration tests to test with multiple namespaces * Add flag to only update mappings of the current write index Co-authored-by: Kibana Machine <[email protected]> Co-authored-by: Kibana Machine <[email protected]>
1 parent d803dc1 commit c441be7

File tree

2 files changed

+112
-135
lines changed

2 files changed

+112
-135
lines changed

x-pack/plugins/fleet/server/services/epm/elasticsearch/template/template.ts

Lines changed: 29 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
TemplateRef,
1212
IndexTemplate,
1313
IndexTemplateMappings,
14-
DataType,
1514
} from '../../../../types';
1615
import { getRegistryDataStreamAssetBaseName } from '../index';
1716

@@ -26,8 +25,8 @@ interface MultiFields {
2625
export interface IndexTemplateMapping {
2726
[key: string]: any;
2827
}
29-
export interface CurrentIndex {
30-
indexName: string;
28+
export interface CurrentDataStream {
29+
dataStreamName: string;
3130
indexTemplate: IndexTemplate;
3231
}
3332
const DEFAULT_SCALING_FACTOR = 1000;
@@ -348,60 +347,60 @@ export const updateCurrentWriteIndices = async (
348347
): Promise<void> => {
349348
if (!templates.length) return;
350349

351-
const allIndices = await queryIndicesFromTemplates(callCluster, templates);
350+
const allIndices = await queryDataStreamsFromTemplates(callCluster, templates);
352351
if (!allIndices.length) return;
353-
return updateAllIndices(allIndices, callCluster);
352+
return updateAllDataStreams(allIndices, callCluster);
354353
};
355354

356-
function isCurrentIndex(item: CurrentIndex[] | undefined): item is CurrentIndex[] {
355+
function isCurrentDataStream(item: CurrentDataStream[] | undefined): item is CurrentDataStream[] {
357356
return item !== undefined;
358357
}
359358

360-
const queryIndicesFromTemplates = async (
359+
const queryDataStreamsFromTemplates = async (
361360
callCluster: CallESAsCurrentUser,
362361
templates: TemplateRef[]
363-
): Promise<CurrentIndex[]> => {
364-
const indexPromises = templates.map((template) => {
365-
return getIndices(callCluster, template);
362+
): Promise<CurrentDataStream[]> => {
363+
const dataStreamPromises = templates.map((template) => {
364+
return getDataStreams(callCluster, template);
366365
});
367-
const indexObjects = await Promise.all(indexPromises);
368-
return indexObjects.filter(isCurrentIndex).flat();
366+
const dataStreamObjects = await Promise.all(dataStreamPromises);
367+
return dataStreamObjects.filter(isCurrentDataStream).flat();
369368
};
370369

371-
const getIndices = async (
370+
const getDataStreams = async (
372371
callCluster: CallESAsCurrentUser,
373372
template: TemplateRef
374-
): Promise<CurrentIndex[] | undefined> => {
373+
): Promise<CurrentDataStream[] | undefined> => {
375374
const { templateName, indexTemplate } = template;
376-
// Until ES provides a way to update mappings of a data stream
377-
// get the last index of the data stream, which is the current write index
378375
const res = await callCluster('transport.request', {
379376
method: 'GET',
380377
path: `/_data_stream/${templateName}-*`,
381378
});
382379
const dataStreams = res.data_streams;
383380
if (!dataStreams.length) return;
384381
return dataStreams.map((dataStream: any) => ({
385-
indexName: dataStream.indices[dataStream.indices.length - 1].index_name,
382+
dataStreamName: dataStream.name,
386383
indexTemplate,
387384
}));
388385
};
389386

390-
const updateAllIndices = async (
391-
indexNameWithTemplates: CurrentIndex[],
387+
const updateAllDataStreams = async (
388+
indexNameWithTemplates: CurrentDataStream[],
392389
callCluster: CallESAsCurrentUser
393390
): Promise<void> => {
394-
const updateIndexPromises = indexNameWithTemplates.map(({ indexName, indexTemplate }) => {
395-
return updateExistingIndex({ indexName, callCluster, indexTemplate });
396-
});
397-
await Promise.all(updateIndexPromises);
391+
const updatedataStreamPromises = indexNameWithTemplates.map(
392+
({ dataStreamName, indexTemplate }) => {
393+
return updateExistingDataStream({ dataStreamName, callCluster, indexTemplate });
394+
}
395+
);
396+
await Promise.all(updatedataStreamPromises);
398397
};
399-
const updateExistingIndex = async ({
400-
indexName,
398+
const updateExistingDataStream = async ({
399+
dataStreamName,
401400
callCluster,
402401
indexTemplate,
403402
}: {
404-
indexName: string;
403+
dataStreamName: string;
405404
callCluster: CallESAsCurrentUser;
406405
indexTemplate: IndexTemplate;
407406
}) => {
@@ -416,53 +415,13 @@ const updateExistingIndex = async ({
416415
// try to update the mappings first
417416
try {
418417
await callCluster('indices.putMapping', {
419-
index: indexName,
418+
index: dataStreamName,
420419
body: mappings,
420+
write_index_only: true,
421421
});
422422
// if update fails, rollover data stream
423423
} catch (err) {
424424
try {
425-
// get the data_stream values to compose datastream name
426-
const searchDataStreamFieldsResponse = await callCluster('search', {
427-
index: indexTemplate.index_patterns[0],
428-
body: {
429-
size: 1,
430-
_source: ['data_stream.namespace', 'data_stream.type', 'data_stream.dataset'],
431-
query: {
432-
bool: {
433-
filter: [
434-
{
435-
exists: {
436-
field: 'data_stream.type',
437-
},
438-
},
439-
{
440-
exists: {
441-
field: 'data_stream.dataset',
442-
},
443-
},
444-
{
445-
exists: {
446-
field: 'data_stream.namespace',
447-
},
448-
},
449-
],
450-
},
451-
},
452-
},
453-
});
454-
if (searchDataStreamFieldsResponse.hits.total.value === 0)
455-
throw new Error('data_stream fields are missing from datastream indices');
456-
const {
457-
dataset,
458-
namespace,
459-
type,
460-
}: {
461-
dataset: string;
462-
namespace: string;
463-
type: DataType;
464-
} = searchDataStreamFieldsResponse.hits.hits[0]._source.data_stream;
465-
const dataStreamName = `${type}-${dataset}-${namespace}`;
466425
const path = `/${dataStreamName}/_rollover`;
467426
await callCluster('transport.request', {
468427
method: 'POST',
@@ -478,10 +437,10 @@ const updateExistingIndex = async ({
478437
if (!settings.index.default_pipeline) return;
479438
try {
480439
await callCluster('indices.putSettings', {
481-
index: indexName,
440+
index: dataStreamName,
482441
body: { index: { default_pipeline: settings.index.default_pipeline } },
483442
});
484443
} catch (err) {
485-
throw new Error(`could not update index template settings for ${indexName}`);
444+
throw new Error(`could not update index template settings for ${dataStreamName}`);
486445
}
487446
};

x-pack/test/fleet_api_integration/apis/epm/data_stream.ts

Lines changed: 83 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,14 @@ export default function (providerContext: FtrProviderContext) {
1212
const { getService } = providerContext;
1313
const supertest = getService('supertest');
1414
const es = getService('es');
15-
const dockerServers = getService('dockerServers');
16-
const server = dockerServers.get('registry');
1715
const pkgName = 'datastreams';
1816
const pkgVersion = '0.1.0';
1917
const pkgUpdateVersion = '0.2.0';
2018
const pkgKey = `${pkgName}-${pkgVersion}`;
2119
const pkgUpdateKey = `${pkgName}-${pkgUpdateVersion}`;
2220
const logsTemplateName = `logs-${pkgName}.test_logs`;
2321
const metricsTemplateName = `metrics-${pkgName}.test_metrics`;
22+
const namespaces = ['default', 'foo', 'bar'];
2423

2524
const uninstallPackage = async (pkg: string) => {
2625
await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx');
@@ -35,86 +34,105 @@ export default function (providerContext: FtrProviderContext) {
3534

3635
describe('datastreams', async () => {
3736
skipIfNoDockerRegistry(providerContext);
37+
3838
beforeEach(async () => {
3939
await installPackage(pkgKey);
40-
await es.transport.request({
41-
method: 'POST',
42-
path: `/${logsTemplateName}-default/_doc`,
43-
body: {
44-
'@timestamp': '2015-01-01',
45-
logs_test_name: 'test',
46-
data_stream: {
47-
dataset: `${pkgName}.test_logs`,
48-
namespace: 'default',
49-
type: 'logs',
50-
},
51-
},
52-
});
53-
await es.transport.request({
54-
method: 'POST',
55-
path: `/${metricsTemplateName}-default/_doc`,
56-
body: {
57-
'@timestamp': '2015-01-01',
58-
logs_test_name: 'test',
59-
data_stream: {
60-
dataset: `${pkgName}.test_metrics`,
61-
namespace: 'default',
62-
type: 'metrics',
63-
},
64-
},
65-
});
40+
await Promise.all(
41+
namespaces.map(async (namespace) => {
42+
const createLogsRequest = es.transport.request({
43+
method: 'POST',
44+
path: `/${logsTemplateName}-${namespace}/_doc`,
45+
body: {
46+
'@timestamp': '2015-01-01',
47+
logs_test_name: 'test',
48+
data_stream: {
49+
dataset: `${pkgName}.test_logs`,
50+
namespace,
51+
type: 'logs',
52+
},
53+
},
54+
});
55+
const createMetricsRequest = es.transport.request({
56+
method: 'POST',
57+
path: `/${metricsTemplateName}-${namespace}/_doc`,
58+
body: {
59+
'@timestamp': '2015-01-01',
60+
logs_test_name: 'test',
61+
data_stream: {
62+
dataset: `${pkgName}.test_metrics`,
63+
namespace,
64+
type: 'metrics',
65+
},
66+
},
67+
});
68+
return Promise.all([createLogsRequest, createMetricsRequest]);
69+
})
70+
);
6671
});
72+
6773
afterEach(async () => {
68-
if (!server.enabled) return;
69-
await es.transport.request({
70-
method: 'DELETE',
71-
path: `/_data_stream/${logsTemplateName}-default`,
72-
});
73-
await es.transport.request({
74-
method: 'DELETE',
75-
path: `/_data_stream/${metricsTemplateName}-default`,
76-
});
74+
await Promise.all(
75+
namespaces.map(async (namespace) => {
76+
const deleteLogsRequest = es.transport.request({
77+
method: 'DELETE',
78+
path: `/_data_stream/${logsTemplateName}-${namespace}`,
79+
});
80+
const deleteMetricsRequest = es.transport.request({
81+
method: 'DELETE',
82+
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
83+
});
84+
return Promise.all([deleteLogsRequest, deleteMetricsRequest]);
85+
})
86+
);
7787
await uninstallPackage(pkgKey);
7888
await uninstallPackage(pkgUpdateKey);
7989
});
90+
8091
it('should list the logs and metrics datastream', async function () {
81-
const resLogsDatastream = await es.transport.request({
82-
method: 'GET',
83-
path: `/_data_stream/${logsTemplateName}-default`,
84-
});
85-
const resMetricsDatastream = await es.transport.request({
86-
method: 'GET',
87-
path: `/_data_stream/${metricsTemplateName}-default`,
92+
namespaces.forEach(async (namespace) => {
93+
const resLogsDatastream = await es.transport.request({
94+
method: 'GET',
95+
path: `/_data_stream/${logsTemplateName}-${namespace}`,
96+
});
97+
const resMetricsDatastream = await es.transport.request({
98+
method: 'GET',
99+
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
100+
});
101+
expect(resLogsDatastream.body.data_streams.length).equal(1);
102+
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
103+
expect(resMetricsDatastream.body.data_streams.length).equal(1);
104+
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
88105
});
89-
expect(resLogsDatastream.body.data_streams.length).equal(1);
90-
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(1);
91-
expect(resMetricsDatastream.body.data_streams.length).equal(1);
92-
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
93106
});
94107

95108
it('after update, it should have rolled over logs datastream because mappings are not compatible and not metrics', async function () {
96109
await installPackage(pkgUpdateKey);
97-
const resLogsDatastream = await es.transport.request({
98-
method: 'GET',
99-
path: `/_data_stream/${logsTemplateName}-default`,
100-
});
101-
const resMetricsDatastream = await es.transport.request({
102-
method: 'GET',
103-
path: `/_data_stream/${metricsTemplateName}-default`,
110+
namespaces.forEach(async (namespace) => {
111+
const resLogsDatastream = await es.transport.request({
112+
method: 'GET',
113+
path: `/_data_stream/${logsTemplateName}-${namespace}`,
114+
});
115+
const resMetricsDatastream = await es.transport.request({
116+
method: 'GET',
117+
path: `/_data_stream/${metricsTemplateName}-${namespace}`,
118+
});
119+
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
120+
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
104121
});
105-
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
106-
expect(resMetricsDatastream.body.data_streams[0].indices.length).equal(1);
107122
});
123+
108124
it('should be able to upgrade a package after a rollover', async function () {
109-
await es.transport.request({
110-
method: 'POST',
111-
path: `/${logsTemplateName}-default/_rollover`,
112-
});
113-
const resLogsDatastream = await es.transport.request({
114-
method: 'GET',
115-
path: `/_data_stream/${logsTemplateName}-default`,
125+
namespaces.forEach(async (namespace) => {
126+
await es.transport.request({
127+
method: 'POST',
128+
path: `/${logsTemplateName}-${namespace}/_rollover`,
129+
});
130+
const resLogsDatastream = await es.transport.request({
131+
method: 'GET',
132+
path: `/_data_stream/${logsTemplateName}-${namespace}`,
133+
});
134+
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
116135
});
117-
expect(resLogsDatastream.body.data_streams[0].indices.length).equal(2);
118136
await installPackage(pkgUpdateKey);
119137
});
120138
});

0 commit comments

Comments
 (0)