diff --git a/x-pack/plugins/fleet/common/types/models/data_stream.ts b/x-pack/plugins/fleet/common/types/models/data_stream.ts index abc9ffcf6be6a..3bebdfcf9d997 100644 --- a/x-pack/plugins/fleet/common/types/models/data_stream.ts +++ b/x-pack/plugins/fleet/common/types/models/data_stream.ts @@ -11,7 +11,7 @@ export interface DataStream { type: string; package: string; package_version: string; - last_activity: string; + last_activity_ms: number; size_in_bytes: number; dashboards: Array<{ id: string; diff --git a/x-pack/plugins/fleet/public/applications/fleet/sections/data_stream/list_page/index.tsx b/x-pack/plugins/fleet/public/applications/fleet/sections/data_stream/list_page/index.tsx index c614518c1930b..23fa4025a93dc 100644 --- a/x-pack/plugins/fleet/public/applications/fleet/sections/data_stream/list_page/index.tsx +++ b/x-pack/plugins/fleet/public/applications/fleet/sections/data_stream/list_page/index.tsx @@ -121,14 +121,14 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => { }, }, { - field: 'last_activity', + field: 'last_activity_ms', sortable: true, width: '25%', dataType: 'date', name: i18n.translate('xpack.fleet.dataStreamList.lastActivityColumnTitle', { defaultMessage: 'Last activity', }), - render: (date: DataStream['last_activity']) => { + render: (date: DataStream['last_activity_ms']) => { try { const formatter = fieldFormats.getInstance('date'); return formatter.convert(date); diff --git a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts index 4820f25c05f96..e9487ef792b63 100644 --- a/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts +++ b/x-pack/plugins/fleet/server/routes/data_streams/handlers.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ import { RequestHandler, SavedObjectsClientContract } from 'src/core/server'; +import { keyBy, keys, merge } from 'lodash'; import { DataStream } from '../../types'; import { GetDataStreamsResponse, KibanaAssetType, KibanaSavedObjectType } from '../../../common'; import { getPackageSavedObjects, getKibanaSavedObject } from '../../services/epm/packages/get'; @@ -11,150 +12,179 @@ import { defaultIngestErrorHandler } from '../../errors'; const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*'; +interface ESDataStreamInfoResponse { + data_streams: Array<{ + name: string; + timestamp_field: { + name: string; + }; + indices: Array<{ index_name: string; index_uuid: string }>; + generation: number; + _meta?: { + package?: { + name: string; + }; + managed_by?: string; + managed?: boolean; + [key: string]: any; + }; + status: string; + template: string; + ilm_policy: string; + hidden: boolean; + }>; +} + +interface ESDataStreamStatsResponse { + data_streams: Array<{ + data_stream: string; + backing_indices: number; + store_size_bytes: number; + maximum_timestamp: number; + }>; +} + export const getListHandler: RequestHandler = async (context, request, response) => { const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser; + const body: GetDataStreamsResponse = { + data_streams: [], + }; try { - // Get stats (size on disk) of all potentially matching indices - const { indices: indexStats } = await callCluster('indices.stats', { - index: DATA_STREAM_INDEX_PATTERN, - metric: ['store'], - }); + // Get matching data streams, their stats, and package SOs + const [ + { data_streams: dataStreamsInfo }, + { data_streams: dataStreamStats }, + packageSavedObjects, + ] = await Promise.all([ + callCluster('transport.request', { + method: 'GET', + path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`, + }) as Promise, + callCluster('transport.request', { + method: 'GET', + path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`, + }) as Promise, + getPackageSavedObjects(context.core.savedObjects.client), + ]); + const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name'); + const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream'); + + // Combine data stream info + const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName); + const dataStreamNames = keys(dataStreams); + + // Map package SOs + const packageSavedObjectsByName = keyBy(packageSavedObjects.saved_objects, 'id'); + const packageMetadata: any = {}; + + // Query additional information for each data stream + const dataStreamPromises = dataStreamNames.map(async (dataStreamName) => { + const dataStream = dataStreams[dataStreamName]; + const dataStreamResponse: DataStream = { + index: dataStreamName, + dataset: '', + namespace: '', + type: '', + package: dataStream._meta?.package?.name || '', + package_version: '', + last_activity_ms: dataStream.maximum_timestamp, + size_in_bytes: dataStream.store_size_bytes, + dashboards: [], + }; - // Get all matching indices and info about each - // This returns the top 100,000 indices (as buckets) by last activity - const { aggregations } = await callCluster('search', { - index: DATA_STREAM_INDEX_PATTERN, - body: { - size: 0, - query: { - bool: { - must: [ - { - exists: { - field: 'data_stream.namespace', + // Query backing indices to extract data stream dataset, namespace, and type values + const { + aggregations: { dataset, namespace, type }, + } = await callCluster('search', { + index: dataStream.indices.map((index) => index.index_name), + body: { + size: 0, + query: { + bool: { + must: [ + { + exists: { + field: 'data_stream.namespace', + }, }, - }, - { - exists: { - field: 'data_stream.dataset', + { + exists: { + field: 'data_stream.dataset', + }, }, - }, - ], + ], + }, }, - }, - aggs: { - index: { - terms: { - field: '_index', - size: 100000, - order: { - last_activity: 'desc', + aggs: { + dataset: { + terms: { + field: 'data_stream.dataset', + size: 1, }, }, - aggs: { - dataset: { - terms: { - field: 'data_stream.dataset', - size: 1, - }, - }, - namespace: { - terms: { - field: 'data_stream.namespace', - size: 1, - }, + namespace: { + terms: { + field: 'data_stream.namespace', + size: 1, }, - type: { - terms: { - field: 'data_stream.type', - size: 1, - }, - }, - last_activity: { - max: { - field: '@timestamp', - }, + }, + type: { + terms: { + field: 'data_stream.type', + size: 1, }, }, }, }, - }, - }); - - const body: GetDataStreamsResponse = { - data_streams: [], - }; - - if (!(aggregations && aggregations.index && aggregations.index.buckets)) { - return response.ok({ - body, }); - } - const { - index: { buckets: indexResults }, - } = aggregations; - - const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client); - const packageMetadata: any = {}; - - const dataStreamsPromises = (indexResults as any[]).map(async (result) => { - const { - key: indexName, - dataset: { buckets: datasetBuckets }, - namespace: { buckets: namespaceBuckets }, - type: { buckets: typeBuckets }, - last_activity: { value_as_string: lastActivity }, - } = result; - - // We don't have a reliable way to associate index with package ID, so - // this is a hack to extract the package ID from the first part of the dataset name - // with fallback to extraction from index name - const pkg = datasetBuckets.length - ? datasetBuckets[0].key.split('.')[0] - : indexName.split('-')[1].split('.')[0]; - const pkgSavedObject = packageSavedObjects.saved_objects.filter((p) => p.id === pkg); - - // if - // - the datastream is associated with a package - // - and the package has been installed through EPM - // - and we didn't pick the metadata in an earlier iteration of this map() - if (pkg !== '' && pkgSavedObject.length > 0 && !packageMetadata[pkg]) { - // then pick the dashboards from the package saved object - const dashboards = - pkgSavedObject[0].attributes?.installed_kibana?.filter( - (o) => o.type === KibanaSavedObjectType.dashboard - ) || []; - // and then pick the human-readable titles from the dashboard saved objects - const enhancedDashboards = await getEnhancedDashboards( - context.core.savedObjects.client, - dashboards - ); - - packageMetadata[pkg] = { - version: pkgSavedObject[0].attributes?.version || '', - dashboards: enhancedDashboards, - }; + // Set values from backing indices query + dataStreamResponse.dataset = dataset.buckets[0]?.key || ''; + dataStreamResponse.namespace = namespace.buckets[0]?.key || ''; + dataStreamResponse.type = type.buckets[0]?.key || ''; + + // Find package saved object + const pkgName = dataStreamResponse.package; + const pkgSavedObject = pkgName ? packageSavedObjectsByName[pkgName] : null; + + if (pkgSavedObject) { + // if + // - the data stream is associated with a package + // - and the package has been installed through EPM + // - and we didn't pick the metadata in an earlier iteration of this map() + if (!packageMetadata[pkgName]) { + // then pick the dashboards from the package saved object + const dashboards = + pkgSavedObject.attributes?.installed_kibana?.filter( + (o) => o.type === KibanaSavedObjectType.dashboard + ) || []; + // and then pick the human-readable titles from the dashboard saved objects + const enhancedDashboards = await getEnhancedDashboards( + context.core.savedObjects.client, + dashboards + ); + + packageMetadata[pkgName] = { + version: pkgSavedObject.attributes?.version || '', + dashboards: enhancedDashboards, + }; + } + + // Set values from package information + dataStreamResponse.package = pkgName; + dataStreamResponse.package_version = packageMetadata[pkgName].version; + dataStreamResponse.dashboards = packageMetadata[pkgName].dashboards; } - return { - index: indexName, - dataset: datasetBuckets.length ? datasetBuckets[0].key : '', - namespace: namespaceBuckets.length ? namespaceBuckets[0].key : '', - type: typeBuckets.length ? typeBuckets[0].key : '', - package: pkgSavedObject.length ? pkg : '', - package_version: packageMetadata[pkg] ? packageMetadata[pkg].version : '', - last_activity: lastActivity, - size_in_bytes: indexStats[indexName] ? indexStats[indexName].total.store.size_in_bytes : 0, - dashboards: packageMetadata[pkg] ? packageMetadata[pkg].dashboards : [], - }; + return dataStreamResponse; }); - const dataStreams: DataStream[] = await Promise.all(dataStreamsPromises); - - body.data_streams = dataStreams; - + // Return final data streams objects sorted by last activity, decending + // After filtering out data streams that are missing dataset/namespace/type fields + body.data_streams = (await Promise.all(dataStreamPromises)) + .filter(({ dataset, namespace, type }) => dataset && namespace && type) + .sort((a, b) => b.last_activity_ms - a.last_activity_ms); return response.ok({ body, }); diff --git a/x-pack/test/fleet_api_integration/apis/data_streams/index.js b/x-pack/test/fleet_api_integration/apis/data_streams/index.js new file mode 100644 index 0000000000000..30c1351edc177 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/data_streams/index.js @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export default function loadTests({ loadTestFile }) { + describe('Data Stream Endpoints', () => { + loadTestFile(require.resolve('./list')); + }); +} diff --git a/x-pack/test/fleet_api_integration/apis/data_streams/list.ts b/x-pack/test/fleet_api_integration/apis/data_streams/list.ts new file mode 100644 index 0000000000000..9a26b3ac73177 --- /dev/null +++ b/x-pack/test/fleet_api_integration/apis/data_streams/list.ts @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import expect from '@kbn/expect'; +import { FtrProviderContext } from '../../../api_integration/ftr_provider_context'; +import { skipIfNoDockerRegistry } from '../../helpers'; + +export default function (providerContext: FtrProviderContext) { + const { getService } = providerContext; + const supertest = getService('supertest'); + const es = getService('es'); + const retry = getService('retry'); + const pkgName = 'datastreams'; + const pkgVersion = '0.1.0'; + const pkgKey = `${pkgName}-${pkgVersion}`; + const logsTemplateName = `logs-${pkgName}.test_logs`; + const metricsTemplateName = `metrics-${pkgName}.test_metrics`; + + const uninstallPackage = async (pkg: string) => { + await supertest.delete(`/api/fleet/epm/packages/${pkg}`).set('kbn-xsrf', 'xxxx'); + }; + + const installPackage = async (pkg: string) => { + return await supertest + .post(`/api/fleet/epm/packages/${pkg}`) + .set('kbn-xsrf', 'xxxx') + .send({ force: true }) + .expect(200); + }; + + const seedDataStreams = async () => { + await es.transport.request({ + method: 'POST', + path: `/${logsTemplateName}-default/_doc`, + body: { + '@timestamp': '2015-01-01', + logs_test_name: 'test', + data_stream: { + dataset: `${pkgName}.test_logs`, + namespace: 'default', + type: 'logs', + }, + }, + }); + await es.transport.request({ + method: 'POST', + path: `/${metricsTemplateName}-default/_doc`, + body: { + '@timestamp': '2015-01-01', + logs_test_name: 'test', + data_stream: { + dataset: `${pkgName}.test_metrics`, + namespace: 'default', + type: 'metrics', + }, + }, + }); + }; + + const getDataStreams = async () => { + return await supertest.get(`/api/fleet/data_streams`).set('kbn-xsrf', 'xxxx'); + }; + + describe('data_streams_list', async () => { + skipIfNoDockerRegistry(providerContext); + + beforeEach(async () => { + await installPackage(pkgKey); + }); + + afterEach(async () => { + await uninstallPackage(pkgKey); + try { + await es.transport.request({ + method: 'DELETE', + path: `/_data_stream/${logsTemplateName}-default`, + }); + await es.transport.request({ + method: 'DELETE', + path: `/_data_stream/${metricsTemplateName}-default`, + }); + } catch (e) { + // Silently swallow errors here as not all tests seed data streams + } + }); + + it("should return no data streams when there isn't any data yet", async function () { + const { body } = await getDataStreams(); + expect(body).to.eql({ data_streams: [] }); + }); + + it('should return correct data stream information', async function () { + await seedDataStreams(); + await retry.tryForTime(10000, async () => { + const { body } = await getDataStreams(); + return expect( + body.data_streams.map((dataStream: any) => { + // eslint-disable-next-line @typescript-eslint/naming-convention + const { index, size_in_bytes, ...rest } = dataStream; + return rest; + }) + ).to.eql([ + { + dataset: 'datastreams.test_logs', + namespace: 'default', + type: 'logs', + package: 'datastreams', + package_version: '0.1.0', + last_activity_ms: 1420070400000, + dashboards: [], + }, + { + dataset: 'datastreams.test_metrics', + namespace: 'default', + type: 'metrics', + package: 'datastreams', + package_version: '0.1.0', + last_activity_ms: 1420070400000, + dashboards: [], + }, + ]); + }); + }); + + it('should return correct number of data streams regardless of number of backing indices', async function () { + await seedDataStreams(); + await retry.tryForTime(10000, async () => { + const { body } = await getDataStreams(); + return expect(body.data_streams.length).to.eql(2); + }); + + // Rollover data streams to increase # of backing indices and seed the new write index + await es.transport.request({ + method: 'POST', + path: `/${logsTemplateName}-default/_rollover`, + }); + await es.transport.request({ + method: 'POST', + path: `/${metricsTemplateName}-default/_rollover`, + }); + await seedDataStreams(); + + // Wait until backing indices are created + await retry.tryForTime(10000, async () => { + const { body } = await es.transport.request({ + method: 'GET', + path: `/${logsTemplateName}-default,${metricsTemplateName}-default/_search`, + body: { + size: 0, + aggs: { + index: { + terms: { + field: '_index', + size: 100000, + }, + }, + }, + }, + }); + expect(body.aggregations.index.buckets.length).to.eql(4); + }); + + // Check that data streams still return correctly + const { body } = await getDataStreams(); + return expect(body.data_streams.length).to.eql(2); + }); + }); +} diff --git a/x-pack/test/fleet_api_integration/apis/index.js b/x-pack/test/fleet_api_integration/apis/index.js index f472599652224..061042c1fe7ad 100644 --- a/x-pack/test/fleet_api_integration/apis/index.js +++ b/x-pack/test/fleet_api_integration/apis/index.js @@ -9,8 +9,10 @@ export default function ({ loadTestFile }) { this.tags('ciGroup10'); // Fleet setup loadTestFile(require.resolve('./fleet_setup')); + // Agent setup loadTestFile(require.resolve('./agents_setup')); + // Agents loadTestFile(require.resolve('./agents/delete')); loadTestFile(require.resolve('./agents/list')); @@ -24,7 +26,7 @@ export default function ({ loadTestFile }) { loadTestFile(require.resolve('./agents/upgrade')); loadTestFile(require.resolve('./agents/reassign')); - // Enrollement API keys + // Enrollment API keys loadTestFile(require.resolve('./enrollment_api_keys/crud')); // EPM @@ -38,6 +40,9 @@ export default function ({ loadTestFile }) { // Agent policies loadTestFile(require.resolve('./agent_policy/index')); + // Data Streams + loadTestFile(require.resolve('./data_streams/index')); + // Settings loadTestFile(require.resolve('./settings/index')); });