Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/common/types/models/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface DataStream {
type: string;
package: string;
package_version: string;
last_activity: string;
last_activity_ms: number;
size_in_bytes: number;
dashboards: Array<{
id: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => {
},
},
{
field: 'last_activity',
field: 'last_activity_ms',
sortable: true,
width: '25%',
dataType: 'date',
name: i18n.translate('xpack.fleet.dataStreamList.lastActivityColumnTitle', {
defaultMessage: 'Last activity',
}),
render: (date: DataStream['last_activity']) => {
render: (date: DataStream['last_activity_ms']) => {
try {
const formatter = fieldFormats.getInstance('date');
return formatter.convert(date);
Expand Down
276 changes: 153 additions & 123 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,157 +4,187 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { RequestHandler, SavedObjectsClientContract } from 'src/core/server';
import { keyBy, keys, merge } from 'lodash';
import { DataStream } from '../../types';
import { GetDataStreamsResponse, KibanaAssetType, KibanaSavedObjectType } from '../../../common';
import { getPackageSavedObjects, getKibanaSavedObject } from '../../services/epm/packages/get';
import { defaultIngestErrorHandler } from '../../errors';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*';

interface ESDataStreamInfoResponse {
data_streams: Array<{
name: string;
timestamp_field: {
name: string;
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}>;
}

interface ESDataStreamStatsResponse {
data_streams: Array<{
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}>;
}

export const getListHandler: RequestHandler = async (context, request, response) => {
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const body: GetDataStreamsResponse = {
data_streams: [],
};

try {
// Get stats (size on disk) of all potentially matching indices
const { indices: indexStats } = await callCluster('indices.stats', {
index: DATA_STREAM_INDEX_PATTERN,
metric: ['store'],
});
// Get matching data streams, their stats, and package SOs
const [
{ data_streams: dataStreamsInfo },
{ data_streams: dataStreamStats },
packageSavedObjects,
] = await Promise.all([
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
}) as Promise<ESDataStreamInfoResponse>,
callCluster('transport.request', {
method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
}) as Promise<ESDataStreamStatsResponse>,
getPackageSavedObjects(context.core.savedObjects.client),
]);
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
const dataStreamNames = keys(dataStreams);

// Map package SOs
const packageSavedObjectsByName = keyBy(packageSavedObjects.saved_objects, 'id');
const packageMetadata: any = {};

// Query additional information for each data stream
const dataStreamPromises = dataStreamNames.map(async (dataStreamName) => {
const dataStream = dataStreams[dataStreamName];
const dataStreamResponse: DataStream = {
index: dataStreamName,
dataset: '',
namespace: '',
type: '',
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp,
size_in_bytes: dataStream.store_size_bytes,
dashboards: [],
};

// Get all matching indices and info about each
// This returns the top 100,000 indices (as buckets) by last activity
const { aggregations } = await callCluster('search', {
index: DATA_STREAM_INDEX_PATTERN,
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
// Query backing indices to extract data stream dataset, namespace, and type values
const {
aggregations: { dataset, namespace, type },
} = await callCluster('search', {
index: dataStream.indices.map((index) => index.index_name),
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
},
},
},
{
exists: {
field: 'data_stream.dataset',
{
exists: {
field: 'data_stream.dataset',
},
},
},
],
],
},
},
},
aggs: {
index: {
terms: {
field: '_index',
size: 100000,
order: {
last_activity: 'desc',
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
last_activity: {
max: {
field: '@timestamp',
},
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
},
},
},
});

const body: GetDataStreamsResponse = {
data_streams: [],
};

if (!(aggregations && aggregations.index && aggregations.index.buckets)) {
return response.ok({
body,
});
}

const {
index: { buckets: indexResults },
} = aggregations;

const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client);
const packageMetadata: any = {};

const dataStreamsPromises = (indexResults as any[]).map(async (result) => {
const {
key: indexName,
dataset: { buckets: datasetBuckets },
namespace: { buckets: namespaceBuckets },
type: { buckets: typeBuckets },
last_activity: { value_as_string: lastActivity },
} = result;

// We don't have a reliable way to associate index with package ID, so
// this is a hack to extract the package ID from the first part of the dataset name
// with fallback to extraction from index name
const pkg = datasetBuckets.length
? datasetBuckets[0].key.split('.')[0]
: indexName.split('-')[1].split('.')[0];
const pkgSavedObject = packageSavedObjects.saved_objects.filter((p) => p.id === pkg);

// if
// - the datastream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (pkg !== '' && pkgSavedObject.length > 0 && !packageMetadata[pkg]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject[0].attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkg] = {
version: pkgSavedObject[0].attributes?.version || '',
dashboards: enhancedDashboards,
};
// Set values from backing indices query
dataStreamResponse.dataset = dataset.buckets[0]?.key || '';
dataStreamResponse.namespace = namespace.buckets[0]?.key || '';
dataStreamResponse.type = type.buckets[0]?.key || '';

// Find package saved object
const pkgName = dataStreamResponse.package;
const pkgSavedObject = pkgName ? packageSavedObjectsByName[pkgName] : null;

if (pkgSavedObject) {
// if
// - the data stream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (!packageMetadata[pkgName]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject.attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkgName] = {
version: pkgSavedObject.attributes?.version || '',
dashboards: enhancedDashboards,
};
}

// Set values from package information
dataStreamResponse.package = pkgName;
dataStreamResponse.package_version = packageMetadata[pkgName].version;
dataStreamResponse.dashboards = packageMetadata[pkgName].dashboards;
}

return {
index: indexName,
dataset: datasetBuckets.length ? datasetBuckets[0].key : '',
namespace: namespaceBuckets.length ? namespaceBuckets[0].key : '',
type: typeBuckets.length ? typeBuckets[0].key : '',
package: pkgSavedObject.length ? pkg : '',
package_version: packageMetadata[pkg] ? packageMetadata[pkg].version : '',
last_activity: lastActivity,
size_in_bytes: indexStats[indexName] ? indexStats[indexName].total.store.size_in_bytes : 0,
dashboards: packageMetadata[pkg] ? packageMetadata[pkg].dashboards : [],
};
return dataStreamResponse;
});

const dataStreams: DataStream[] = await Promise.all(dataStreamsPromises);

body.data_streams = dataStreams;

// Return final data streams objects sorted by last activity, decending
// After filtering out data streams that are missing dataset/namespace/type fields
body.data_streams = (await Promise.all(dataStreamPromises))
.filter(({ dataset, namespace, type }) => dataset && namespace && type)
.sort((a, b) => b.last_activity_ms - a.last_activity_ms);
return response.ok({
body,
});
Expand Down
11 changes: 11 additions & 0 deletions x-pack/test/fleet_api_integration/apis/data_streams/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export default function loadTests({ loadTestFile }) {
describe('Data Stream Endpoints', () => {
loadTestFile(require.resolve('./list'));
});
}
Loading