Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { flagSupportedClusters } from './flag_supported_clusters';
import { getMlJobsForCluster } from '../elasticsearch';
import { getKibanasForClusters } from '../kibana';
import { getLogstashForClusters } from '../logstash';
import { getPipelines } from '../logstash/get_pipelines';
import { getPipelines, logstashPipelineWithoutPipelineFieldsExists } from '../logstash/get_pipelines';
import { getBeatsForClusters } from '../beats';
import { alertsClustersAggregation } from '../../cluster_alerts/alerts_clusters_aggregation';
import { alertsClusterSearch } from '../../cluster_alerts/alerts_cluster_search';
Expand Down Expand Up @@ -123,14 +123,15 @@ export async function getClustersFromRequest(req, indexPatterns, { clusterUuid,
// add logstash data
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);

const resp = await logstashPipelineWithoutPipelineFieldsExists(req, lsIndexPattern);
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, ['logstash_cluster_pipeline_nodes_count']);

// add the logstash data to each cluster
logstashes.forEach(logstash => {
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });

// withhold LS overview stats until pipeline metrics have at least one full bucket
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) {
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0 && !resp) {
logstash.stats = {};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) {
size: config.get('xpack.monitoring.max_bucket_size')
}
},
with_no_pipeline: {
missing: {
field: 'logstash_stats.pipelines.id',
},
},
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines'
Expand Down Expand Up @@ -186,6 +191,7 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) {
const aggregations = get(result, 'aggregations', {});
const logstashUuids = get(aggregations, 'logstash_uuids.buckets', []);
const logstashVersions = get(aggregations, 'logstash_versions.buckets', []);
const withoutPipelineField = get(aggregations, 'with_no_pipeline.doc_count', 0);

// everything is initialized such that it won't impact any rollup
let eventsInTotal = 0;
Expand All @@ -212,7 +218,7 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) {
avg_memory: memory,
avg_memory_used: memoryUsed,
max_uptime: maxUptime,
pipeline_count: get(aggregations, 'pipelines_nested.pipelines.value', 0),
pipeline_count: get(aggregations, 'pipelines_nested.pipelines.value', 0) + withoutPipelineField ? 1 : 0,
queue_types: getQueueTypes(get(aggregations, 'pipelines_nested.queue_types.buckets', [])),
versions: logstashVersions.map(versionBucket => versionBucket.key)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { cloneDeep, last, omit } from 'lodash';
import { cloneDeep, last, omit, get } from 'lodash';
import { checkParam } from '../error_missing_required';
import { getMetrics } from '../details/get_metrics';

Expand Down Expand Up @@ -52,6 +52,12 @@ export function _handleResponse(response) {
return pipelines;
}

export async function logstashPipelineWithoutPipelineFieldsExists(req, logstashIndexPattern) {
const metricResponse = await getMetrics(req, logstashIndexPattern, ['logstash_cluster_no_pipelines_count']);
const noPipelinesCount = get(metricResponse, 'logstash_cluster_no_pipelines_count', []);
return noPipelinesCount.reduce((acc, { data }) => acc || data.some(([_a, b]) => b === true), false);
}

export async function processPipelinesAPIResponse(response, throughputMetricKey, nodesCountMetricKey) {
// Clone to avoid mutating original response
const processedResponse = cloneDeep(response);
Expand Down
23 changes: 23 additions & 0 deletions x-pack/plugins/monitoring/server/lib/metrics/logstash/classes.js
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,24 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric {
}
}

export class LogstashPipelineNodeWithoutPipelineCountMetric extends LogstashMetric {
constructor(opts) {
super({
...opts,
derivative: false
});

this.dateHistogramSubAggs = {
with_no_pipeline: {
missing: {
field: 'logstash_stats.pipelines.id',
},
}
};
this.calculation = bucket => !!_.get(bucket, 'with_no_pipeline.doc_count', undefined);
}
}

export class LogstashPipelineNodeCountMetric extends LogstashMetric {
constructor(opts) {
super({
Expand All @@ -354,6 +372,11 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric {
});

this.dateHistogramSubAggs = {
with_no_pipeline: {
missing: {
field: 'logstash_stats.pipelines.id',
},
},
pipelines_nested: {
nested: {
path: 'logstash_stats.pipelines'
Expand Down
14 changes: 13 additions & 1 deletion x-pack/plugins/monitoring/server/lib/metrics/logstash/metrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
LogstashMetric,
LogstashPipelineQueueSizeMetric,
LogstashPipelineThroughputMetric,
LogstashPipelineNodeCountMetric
LogstashPipelineNodeCountMetric,
LogstashPipelineNodeWithoutPipelineCountMetric
} from './classes';
import {
LARGE_FLOAT,
Expand Down Expand Up @@ -341,6 +342,17 @@ export const metrics = {
format: LARGE_FLOAT,
units: ''
}),
logstash_cluster_no_pipelines_count: new LogstashPipelineNodeWithoutPipelineCountMetric({
field: 'logstash_stats.logstash.uuid',
label: i18n.translate('xpack.monitoring.metrics.logstashInstance.pipelinesWithoutIdCountLabel', {
defaultMessage: 'Pipeline Without ID'
}),
description: i18n.translate('xpack.monitoring.metrics.logstashInstance.pipelinesWithoutIdCountDescription', {
defaultMessage: 'Denotes whether an instance is logging a pipeline without later pipeline fields.'
}),
format: LARGE_FLOAT,
units: ''
}),
logstash_node_pipeline_nodes_count: new LogstashPipelineNodeCountMetric({
uuidField: 'logstash_stats.logstash.uuid', // TODO: add comment explaining why
field: 'logstash_stats.logstash.uuid',
Expand Down