diff --git a/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js b/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js index fc571dac19b0f..446ae302b0956 100644 --- a/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js +++ b/x-pack/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js @@ -11,7 +11,7 @@ import { flagSupportedClusters } from './flag_supported_clusters'; import { getMlJobsForCluster } from '../elasticsearch'; import { getKibanasForClusters } from '../kibana'; import { getLogstashForClusters } from '../logstash'; -import { getPipelines } from '../logstash/get_pipelines'; +import { getPipelines, logstashPipelineWithoutPipelineFieldsExists } from '../logstash/get_pipelines'; import { getBeatsForClusters } from '../beats'; import { alertsClustersAggregation } from '../../cluster_alerts/alerts_clusters_aggregation'; import { alertsClusterSearch } from '../../cluster_alerts/alerts_cluster_search'; @@ -123,6 +123,7 @@ export async function getClustersFromRequest(req, indexPatterns, { clusterUuid, // add logstash data const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters); + const resp = await logstashPipelineWithoutPipelineFieldsExists(req, lsIndexPattern); const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, ['logstash_cluster_pipeline_nodes_count']); // add the logstash data to each cluster @@ -130,7 +131,7 @@ export async function getClustersFromRequest(req, indexPatterns, { clusterUuid, const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid }); // withhold LS overview stats until pipeline metrics have at least one full bucket - if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) { + if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0 && !resp) { logstash.stats = {}; } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.js b/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.js index 0bd527523ad7d..00680d852fa6b 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_logstash_for_clusters.js @@ -125,6 +125,11 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) { size: config.get('xpack.monitoring.max_bucket_size') } }, + with_no_pipeline: { + missing: { + field: 'logstash_stats.pipelines.id', + }, + }, pipelines_nested: { nested: { path: 'logstash_stats.pipelines' @@ -186,6 +191,7 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) { const aggregations = get(result, 'aggregations', {}); const logstashUuids = get(aggregations, 'logstash_uuids.buckets', []); const logstashVersions = get(aggregations, 'logstash_versions.buckets', []); + const withoutPipelineField = get(aggregations, 'with_no_pipeline.doc_count', 0); // everything is initialized such that it won't impact any rollup let eventsInTotal = 0; @@ -212,7 +218,7 @@ export function getLogstashForClusters(req, lsIndexPattern, clusters) { avg_memory: memory, avg_memory_used: memoryUsed, max_uptime: maxUptime, - pipeline_count: get(aggregations, 'pipelines_nested.pipelines.value', 0), + pipeline_count: get(aggregations, 'pipelines_nested.pipelines.value', 0) + withoutPipelineField ? 1 : 0, queue_types: getQueueTypes(get(aggregations, 'pipelines_nested.queue_types.buckets', [])), versions: logstashVersions.map(versionBucket => versionBucket.key) } diff --git a/x-pack/plugins/monitoring/server/lib/logstash/get_pipelines.js b/x-pack/plugins/monitoring/server/lib/logstash/get_pipelines.js index 8cec101477ecf..70b9ebc94b03a 100644 --- a/x-pack/plugins/monitoring/server/lib/logstash/get_pipelines.js +++ b/x-pack/plugins/monitoring/server/lib/logstash/get_pipelines.js @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { cloneDeep, last, omit } from 'lodash'; +import { cloneDeep, last, omit, get } from 'lodash'; import { checkParam } from '../error_missing_required'; import { getMetrics } from '../details/get_metrics'; @@ -52,6 +52,12 @@ export function _handleResponse(response) { return pipelines; } +export async function logstashPipelineWithoutPipelineFieldsExists(req, logstashIndexPattern) { + const metricResponse = await getMetrics(req, logstashIndexPattern, ['logstash_cluster_no_pipelines_count']); + const noPipelinesCount = get(metricResponse, 'logstash_cluster_no_pipelines_count', []); + return noPipelinesCount.reduce((acc, { data }) => acc || data.some(([_a, b]) => b === true), false); +} + export async function processPipelinesAPIResponse(response, throughputMetricKey, nodesCountMetricKey) { // Clone to avoid mutating original response const processedResponse = cloneDeep(response); diff --git a/x-pack/plugins/monitoring/server/lib/metrics/logstash/classes.js b/x-pack/plugins/monitoring/server/lib/metrics/logstash/classes.js index dade736cd53f8..8c025673c03b5 100644 --- a/x-pack/plugins/monitoring/server/lib/metrics/logstash/classes.js +++ b/x-pack/plugins/monitoring/server/lib/metrics/logstash/classes.js @@ -346,6 +346,24 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { } } +export class LogstashPipelineNodeWithoutPipelineCountMetric extends LogstashMetric { + constructor(opts) { + super({ + ...opts, + derivative: false + }); + + this.dateHistogramSubAggs = { + with_no_pipeline: { + missing: { + field: 'logstash_stats.pipelines.id', + }, + } + }; + this.calculation = bucket => !!_.get(bucket, 'with_no_pipeline.doc_count', undefined); + } +} + export class LogstashPipelineNodeCountMetric extends LogstashMetric { constructor(opts) { super({ @@ -354,6 +372,11 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric { }); this.dateHistogramSubAggs = { + with_no_pipeline: { + missing: { + field: 'logstash_stats.pipelines.id', + }, + }, pipelines_nested: { nested: { path: 'logstash_stats.pipelines' diff --git a/x-pack/plugins/monitoring/server/lib/metrics/logstash/metrics.js b/x-pack/plugins/monitoring/server/lib/metrics/logstash/metrics.js index a87f79533da3b..bd9a15530689e 100644 --- a/x-pack/plugins/monitoring/server/lib/metrics/logstash/metrics.js +++ b/x-pack/plugins/monitoring/server/lib/metrics/logstash/metrics.js @@ -13,7 +13,8 @@ import { LogstashMetric, LogstashPipelineQueueSizeMetric, LogstashPipelineThroughputMetric, - LogstashPipelineNodeCountMetric + LogstashPipelineNodeCountMetric, + LogstashPipelineNodeWithoutPipelineCountMetric } from './classes'; import { LARGE_FLOAT, @@ -341,6 +342,17 @@ export const metrics = { format: LARGE_FLOAT, units: '' }), + logstash_cluster_no_pipelines_count: new LogstashPipelineNodeWithoutPipelineCountMetric({ + field: 'logstash_stats.logstash.uuid', + label: i18n.translate('xpack.monitoring.metrics.logstashInstance.pipelinesWithoutIdCountLabel', { + defaultMessage: 'Pipeline Without ID' + }), + description: i18n.translate('xpack.monitoring.metrics.logstashInstance.pipelinesWithoutIdCountDescription', { + defaultMessage: 'Denotes whether an instance is logging a pipeline without later pipeline fields.' + }), + format: LARGE_FLOAT, + units: '' + }), logstash_node_pipeline_nodes_count: new LogstashPipelineNodeCountMetric({ uuidField: 'logstash_stats.logstash.uuid', // TODO: add comment explaining why field: 'logstash_stats.logstash.uuid',