Skip to content
Merged
2 changes: 1 addition & 1 deletion x-pack/plugins/fleet/common/types/models/data_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface DataStream {
type: string;
package: string;
package_version: string;
last_activity: string;
last_activity_ms: number;
size_in_bytes: number;
dashboards: Array<{
id: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ export const DataStreamListPage: React.FunctionComponent<{}> = () => {
},
},
{
field: 'last_activity',
field: 'last_activity_ms',
sortable: true,
width: '25%',
dataType: 'date',
name: i18n.translate('xpack.fleet.dataStreamList.lastActivityColumnTitle', {
defaultMessage: 'Last activity',
}),
render: (date: DataStream['last_activity']) => {
render: (date: DataStream['last_activity_ms']) => {
try {
const formatter = fieldFormats.getInstance('date');
return formatter.convert(date);
Expand Down
272 changes: 149 additions & 123 deletions x-pack/plugins/fleet/server/routes/data_streams/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,157 +4,183 @@
* you may not use this file except in compliance with the Elastic License.
*/
import { RequestHandler, SavedObjectsClientContract } from 'src/core/server';
import { keyBy, keys, merge } from 'lodash';
import { DataStream } from '../../types';
import { GetDataStreamsResponse, KibanaAssetType, KibanaSavedObjectType } from '../../../common';
import { getPackageSavedObjects, getKibanaSavedObject } from '../../services/epm/packages/get';
import { defaultIngestErrorHandler } from '../../errors';

const DATA_STREAM_INDEX_PATTERN = 'logs-*-*,metrics-*-*,traces-*-*';

interface ESDataStreamInfoResponse {
data_streams: Array<{
name: string;
timestamp_field: {
name: string;
};
indices: Array<{ index_name: string; index_uuid: string }>;
generation: number;
_meta?: {
package?: {
name: string;
};
managed_by?: string;
managed?: boolean;
[key: string]: any;
};
status: string;
template: string;
ilm_policy: string;
hidden: boolean;
}>;
}

interface ESDataStreamStatsResponse {
data_streams: Array<{
data_stream: string;
backing_indices: number;
store_size_bytes: number;
maximum_timestamp: number;
}>;
}

export const getListHandler: RequestHandler = async (context, request, response) => {
const callCluster = context.core.elasticsearch.legacy.client.callAsCurrentUser;
const body: GetDataStreamsResponse = {
data_streams: [],
};

try {
// Get stats (size on disk) of all potentially matching indices
const { indices: indexStats } = await callCluster('indices.stats', {
index: DATA_STREAM_INDEX_PATTERN,
metric: ['store'],
});
// Get matching data streams
const { data_streams: dataStreamsInfo } = (await callCluster('transport.request', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem better defined in a service than a http handler.

method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}`,
})) as ESDataStreamInfoResponse;
const dataStreamsInfoByName = keyBy(dataStreamsInfo, 'name');

// Get data stream stats
const { data_streams: dataStreamStats } = (await callCluster('transport.request', {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can do the callCluster calls in parallel instead of serially.

Copy link
Contributor Author

@jen-huang jen-huang Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated! I tossed the packaged SO retrieval in there too

method: 'GET',
path: `/_data_stream/${DATA_STREAM_INDEX_PATTERN}/_stats`,
})) as ESDataStreamStatsResponse;
const dataStreamsStatsByName = keyBy(dataStreamStats, 'data_stream');

// Combine data stream info
const dataStreams = merge(dataStreamsInfoByName, dataStreamsStatsByName);
const dataStreamNames = keys(dataStreams);

// Get all package SOs
const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client);
const packageSavedObjectsByName = keyBy(packageSavedObjects.saved_objects, 'id');
const packageMetadata: any = {};

// Query additional information for each data stream
const dataStreamPromises = dataStreamNames.map(async (dataStreamName) => {
const dataStream = dataStreams[dataStreamName];
const dataStreamResponse: DataStream = {
index: dataStreamName,
dataset: '',
namespace: '',
type: '',
package: dataStream._meta?.package?.name || '',
package_version: '',
last_activity_ms: dataStream.maximum_timestamp,
size_in_bytes: dataStream.store_size_bytes,
dashboards: [],
};

// Get all matching indices and info about each
// This returns the top 100,000 indices (as buckets) by last activity
const { aggregations } = await callCluster('search', {
index: DATA_STREAM_INDEX_PATTERN,
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
// Query backing indices to extract data stream dataset, namespace, and type values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could find a better way here for doing this. Instead of checking all the indices for it we could check what the values are in the mapping of the write index? Or if we run the query, could we ensure it only touches one index (most recent one) and not all of them? But then, what if this index was rollover but has no data inside yet?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if this index was rollover but has no data inside yet

At some point I wrote the index name to be the last backing index (write index), but changed it for this exact reason :) I couldn't think of a better way to get these values but am open to suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would tend to trust ES here to be efficient in its aggregations, and leave it like this, until we know that it's a problem. Probing several indices until we find the newest one that has data is probably slower. Do we have a system with lots of rolled-over backing indices we could test with?

const {
aggregations: { dataset, namespace, type },
} = await callCluster('search', {
index: dataStream.indices.map((index) => index.index_name),
body: {
size: 0,
query: {
bool: {
must: [
{
exists: {
field: 'data_stream.namespace',
},
},
},
{
exists: {
field: 'data_stream.dataset',
{
exists: {
field: 'data_stream.dataset',
},
},
},
],
],
},
},
},
aggs: {
index: {
terms: {
field: '_index',
size: 100000,
order: {
last_activity: 'desc',
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
aggs: {
dataset: {
terms: {
field: 'data_stream.dataset',
size: 1,
},
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
namespace: {
terms: {
field: 'data_stream.namespace',
size: 1,
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
last_activity: {
max: {
field: '@timestamp',
},
},
type: {
terms: {
field: 'data_stream.type',
size: 1,
},
},
},
},
},
});

const body: GetDataStreamsResponse = {
data_streams: [],
};

if (!(aggregations && aggregations.index && aggregations.index.buckets)) {
return response.ok({
body,
});
}

const {
index: { buckets: indexResults },
} = aggregations;

const packageSavedObjects = await getPackageSavedObjects(context.core.savedObjects.client);
const packageMetadata: any = {};

const dataStreamsPromises = (indexResults as any[]).map(async (result) => {
const {
key: indexName,
dataset: { buckets: datasetBuckets },
namespace: { buckets: namespaceBuckets },
type: { buckets: typeBuckets },
last_activity: { value_as_string: lastActivity },
} = result;

// We don't have a reliable way to associate index with package ID, so
// this is a hack to extract the package ID from the first part of the dataset name
// with fallback to extraction from index name
const pkg = datasetBuckets.length
? datasetBuckets[0].key.split('.')[0]
: indexName.split('-')[1].split('.')[0];
const pkgSavedObject = packageSavedObjects.saved_objects.filter((p) => p.id === pkg);

// if
// - the datastream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (pkg !== '' && pkgSavedObject.length > 0 && !packageMetadata[pkg]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject[0].attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkg] = {
version: pkgSavedObject[0].attributes?.version || '',
dashboards: enhancedDashboards,
};
// Set values from backing indices query
dataStreamResponse.dataset = dataset.buckets[0]?.key || '';
dataStreamResponse.namespace = namespace.buckets[0]?.key || '';
dataStreamResponse.type = type.buckets[0]?.key || '';

// Find package saved object
const pkgName = dataStreamResponse.package;
const pkgSavedObject = pkgName ? packageSavedObjectsByName[pkgName] : null;

if (pkgSavedObject) {
// if
// - the data stream is associated with a package
// - and the package has been installed through EPM
// - and we didn't pick the metadata in an earlier iteration of this map()
if (!packageMetadata[pkgName]) {
// then pick the dashboards from the package saved object
const dashboards =
pkgSavedObject.attributes?.installed_kibana?.filter(
(o) => o.type === KibanaSavedObjectType.dashboard
) || [];
// and then pick the human-readable titles from the dashboard saved objects
const enhancedDashboards = await getEnhancedDashboards(
context.core.savedObjects.client,
dashboards
);

packageMetadata[pkgName] = {
version: pkgSavedObject.attributes?.version || '',
dashboards: enhancedDashboards,
};
}

// Set values from package information
dataStreamResponse.package = pkgName;
dataStreamResponse.package_version = packageMetadata[pkgName].version;
dataStreamResponse.dashboards = packageMetadata[pkgName].dashboards;
}

return {
index: indexName,
dataset: datasetBuckets.length ? datasetBuckets[0].key : '',
namespace: namespaceBuckets.length ? namespaceBuckets[0].key : '',
type: typeBuckets.length ? typeBuckets[0].key : '',
package: pkgSavedObject.length ? pkg : '',
package_version: packageMetadata[pkg] ? packageMetadata[pkg].version : '',
last_activity: lastActivity,
size_in_bytes: indexStats[indexName] ? indexStats[indexName].total.store.size_in_bytes : 0,
dashboards: packageMetadata[pkg] ? packageMetadata[pkg].dashboards : [],
};
return dataStreamResponse;
});

const dataStreams: DataStream[] = await Promise.all(dataStreamsPromises);

body.data_streams = dataStreams;

// Return final data streams objects sorted by last activity, decending
// After filtering out data streams that are missing dataset/namespace/type fields
body.data_streams = (await Promise.all(dataStreamPromises))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handler has more logic/statements than most others. What can we move to the service layer?

I think we can move lines 53-177 into a service allowing something like

Suggested change
body.data_streams = (await Promise.all(dataStreamPromises))
body.data_streams = await service.method()

Perhaps it's more than that, but I think we should move as much of this code as possible into a service.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this code was written originally, it was an intentional decision to not create a data stream service yet and just leave the code in the handler because 1) there's only 1 data stream API (list), 2) no other place in the plugins needs access to this logic, and 3) no downstream plugins needs access either. I'd like to keep this simple for now until one of these situations change. Is this blocking for you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not a blocker. Service or not, I think that code should be pulled into functions to keep the handler easier to read/test, but that can be done later

.filter(({ dataset, namespace, type }) => dataset && namespace && type)
.sort((a, b) => b.last_activity_ms - a.last_activity_ms);
return response.ok({
body,
});
Expand Down
11 changes: 11 additions & 0 deletions x-pack/test/fleet_api_integration/apis/data_streams/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export default function loadTests({ loadTestFile }) {
describe('Data Stream Endpoints', () => {
loadTestFile(require.resolve('./list'));
});
}
Loading