Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,31 @@
* 2.0.
*/

export const createDatasetFilter = (legacyType: string, dataset: string) => ({
/**
* We expected metricset and dataset to match but it appears to be the case that
* dataset is the full {product}.{metricset}, whereas metricset doesn't include
* the product, e.g. dataset is elasticsearch.cluster_stats and metricset is
* just cluster_stats.
*
* TODO: Consider having this function accept "product" and "metricset", and
* concatenate them together to form the dataset, to avoid repetition, or
* provide types to narrow the valid values for dataset and metricset.
*
* @param {string} type matches legacy data
* @param {string} metricset matches standalone beats
* @param {string} dataset matches agent integration data streams
*/
export const createDatasetFilter = (type: string, metricset: string, dataset: string) => ({
bool: {
should: [
{
term: {
type: legacyType,
type,
},
},
{
term: {
'metricset.name': metricset,
},
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,39 @@ describe('fetchCCReadExceptions', () => {
bool: {
filter: [
{
nested: {
path: 'ccr_stats.read_exceptions',
query: { exists: { field: 'ccr_stats.read_exceptions.exception' } },
bool: {
should: [
{
nested: {
ignore_unmapped: true,
path: 'ccr_stats.read_exceptions',
query: {
exists: {
field: 'ccr_stats.read_exceptions.exception',
},
},
},
},
{
nested: {
ignore_unmapped: true,
path: 'elasticsearch.ccr.read_exceptions',
query: {
exists: {
field: 'elasticsearch.ccr.read_exceptions.exception',
},
},
},
},
],
minimum_should_match: 1,
},
},
{
bool: {
should: [
{ term: { type: 'ccr_stats' } },
{ term: { 'metricset.name': 'ccr' } },
{ term: { 'data_stream.dataset': 'elasticsearch.ccr' } },
],
minimum_should_match: 1,
Expand All @@ -82,9 +106,13 @@ describe('fetchCCReadExceptions', () => {
_source: {
includes: [
'cluster_uuid',
'elasticsearch.cluster.id',
'ccr_stats.read_exceptions',
'elasticsearch.ccr.read_exceptions',
'ccr_stats.shard_id',
'elasticsearch.ccr.shard_id',
'ccr_stats.leader_index',
'elasticsearch.ccr.leader.index',
],
},
size: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,35 @@ export async function fetchCCRReadExceptions(
bool: {
filter: [
{
nested: {
path: 'ccr_stats.read_exceptions',
query: {
exists: {
field: 'ccr_stats.read_exceptions.exception',
bool: {
should: [
{
nested: {
ignore_unmapped: true,
path: 'ccr_stats.read_exceptions',
query: {
exists: {
field: 'ccr_stats.read_exceptions.exception',
},
},
},
},
},
{
nested: {
ignore_unmapped: true,
path: 'elasticsearch.ccr.read_exceptions',
query: {
exists: {
field: 'elasticsearch.ccr.read_exceptions.exception',
},
},
},
},
],
minimum_should_match: 1,
},
},
createDatasetFilter('ccr_stats', 'elasticsearch.ccr'),
createDatasetFilter('ccr_stats', 'ccr', 'elasticsearch.ccr'),
{
range: {
timestamp: {
Expand Down Expand Up @@ -83,9 +102,13 @@ export async function fetchCCRReadExceptions(
_source: {
includes: [
'cluster_uuid',
'elasticsearch.cluster.id',
'ccr_stats.read_exceptions',
'elasticsearch.ccr.read_exceptions',
'ccr_stats.shard_id',
'elasticsearch.ccr.shard_id',
'ccr_stats.leader_index',
'elasticsearch.ccr.leader.index',
],
},
size: 1,
Expand Down Expand Up @@ -123,15 +146,19 @@ export async function fetchCCRReadExceptions(

for (const followerIndexBucket of followerIndicesBuckets) {
const followerIndex = followerIndexBucket.key;
const {
_index: monitoringIndexName,
_source: { ccr_stats: ccrStats, cluster_uuid: clusterUuid },
} = get(followerIndexBucket, 'hits.hits.hits[0]');
const {
read_exceptions: readExceptions,
leader_index: leaderIndex,
shard_id: shardId,
} = ccrStats;
const clusterUuid =
get(followerIndexBucket, 'hits.hits.hits[0]._source.cluster_uuid') ||
get(followerIndexBucket, 'hits.hits.hits[0]_source.elasticsearch.cluster.id');

const monitoringIndexName = get(followerIndexBucket, 'hits.hits.hits[0]._index');
const ccrStats =
get(followerIndexBucket, 'hits.hits.hits[0]._source.ccr_stats') ||
get(followerIndexBucket, 'hits.hits.hits[0]._source.elasticsearch.ccr');

const { read_exceptions: readExceptions, shard_id: shardId } = ccrStats;

const leaderIndex = ccrStats.leaderIndex || ccrStats.leader.index;

const { exception: lastReadException } = readExceptions[readExceptions.length - 1];

stats.push({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ describe('fetchClusterHealth', () => {
'*:.monitoring-es-*,.monitoring-es-*,*:metrics-elasticsearch.cluster_stats-*,metrics-elasticsearch.cluster_stats-*',
filter_path: [
'hits.hits._source.cluster_state.status',
'hits.hits._source.elasticsearch.cluster.stats.status',
'hits.hits._source.cluster_uuid',
'hits.hits._source.elasticsearch.cluster.id',
'hits.hits._index',
],
body: {
Expand All @@ -79,6 +81,7 @@ describe('fetchClusterHealth', () => {
bool: {
should: [
{ term: { type: 'cluster_stats' } },
{ term: { 'metricset.name': 'cluster_stats' } },
{ term: { 'data_stream.dataset': 'elasticsearch.cluster_stats' } },
],
minimum_should_match: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ export async function fetchClusterHealth(
index: indexPatterns,
filter_path: [
'hits.hits._source.cluster_state.status',
'hits.hits._source.elasticsearch.cluster.stats.status',
'hits.hits._source.cluster_uuid',
'hits.hits._source.elasticsearch.cluster.id',
'hits.hits._index',
],
body: {
Expand All @@ -48,7 +50,7 @@ export async function fetchClusterHealth(
cluster_uuid: clusters.map((cluster) => cluster.clusterUuid),
},
},
createDatasetFilter('cluster_stats', 'elasticsearch.cluster_stats'),
createDatasetFilter('cluster_stats', 'cluster_stats', 'elasticsearch.cluster_stats'),
{
range: {
timestamp: {
Expand Down Expand Up @@ -77,8 +79,9 @@ export async function fetchClusterHealth(
const response = await esClient.search<ElasticsearchSource>(params);
return (response.hits?.hits ?? []).map((hit) => {
return {
health: hit._source!.cluster_state?.status,
clusterUuid: hit._source!.cluster_uuid,
health:
hit._source!.cluster_state?.status || hit._source!.elasticsearch?.cluster?.stats?.status,
clusterUuid: hit._source!.cluster_uuid || hit._source!.elasticsearch?.cluster?.id,
ccs: hit._index.includes(':') ? hit._index.split(':')[0] : undefined,
} as AlertClusterHealth;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ describe('fetchClusters', () => {
filter_path: [
'hits.hits._source.cluster_settings.cluster.metadata.display_name',
'hits.hits._source.cluster_uuid',
'hits.hits._source.elasticsearch.cluster.id',
'hits.hits._source.cluster_name',
'hits.hits._source.elasticsearch.cluster.name',
],
body: {
size: 1000,
Expand All @@ -98,6 +100,7 @@ describe('fetchClusters', () => {
bool: {
should: [
{ term: { type: 'cluster_stats' } },
{ term: { 'metricset.name': 'cluster_stats' } },
{ term: { 'data_stream.dataset': 'elasticsearch.cluster_stats' } },
],
minimum_should_match: 1,
Expand Down
55 changes: 7 additions & 48 deletions x-pack/plugins/monitoring/server/lib/alerts/fetch_clusters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ export async function fetchClusters(
filter_path: [
'hits.hits._source.cluster_settings.cluster.metadata.display_name',
'hits.hits._source.cluster_uuid',
'hits.hits._source.elasticsearch.cluster.id',
'hits.hits._source.cluster_name',
'hits.hits._source.elasticsearch.cluster.name',
],
body: {
size: 1000,
query: {
bool: {
filter: [
createDatasetFilter('cluster_stats', 'elasticsearch.cluster_stats'),
createDatasetFilter('cluster_stats', 'cluster_stats', 'elasticsearch.cluster_stats'),
{
range: rangeFilter,
},
Expand All @@ -56,59 +58,16 @@ export async function fetchClusters(
};

const response = await esClient.search(params);
return get(response, 'hits.hits', []).map((hit: any) => {
const clusterName: string =
get(hit, '_source.cluster_settings.cluster.metadata.display_name') ||
get(hit, '_source.cluster_name') ||
get(hit, '_source.cluster_uuid');
return {
clusterUuid: get(hit, '_source.cluster_uuid'),
clusterName,
};
});
}

export async function fetchClustersLegacy(
callCluster: any,
index: string,
rangeFilter: RangeFilter = { timestamp: { gte: 'now-2m' } }
): Promise<AlertCluster[]> {
const params = {
index,
filter_path: [
'hits.hits._source.cluster_settings.cluster.metadata.display_name',
'hits.hits._source.cluster_uuid',
'hits.hits._source.cluster_name',
],
body: {
size: 1000,
query: {
bool: {
filter: [
{
term: {
type: 'cluster_stats',
},
},
{
range: rangeFilter,
},
],
},
},
collapse: {
field: 'cluster_uuid',
},
},
};
const response = await callCluster('search', params);
return get(response, 'hits.hits', []).map((hit: any) => {
const clusterName: string =
get(hit, '_source.cluster_settings.cluster.metadata.display_name') ||
get(hit, '_source.cluster_name') ||
get(hit, '_source.cluster_uuid');
get(hit, '_source.elasticsearch.cluster.name') ||
get(hit, '_source.cluster_uuid') ||
get(hit, '_source.elasticsearch.cluster.id');
return {
clusterUuid: get(hit, '_source.cluster_uuid'),
clusterUuid: get(hit, '_source.cluster_uuid') || get(hit, '_source.elasticsearch.cluster.id'),
clusterName,
};
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ describe('fetchCpuUsageNodeStats', () => {
bool: {
should: [
{ term: { type: 'node_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
{ term: { 'data_stream.dataset': 'elasticsearch.node_stats' } },
],
minimum_should_match: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export async function fetchCpuUsageNodeStats(
cluster_uuid: clusters.map((cluster) => cluster.clusterUuid),
},
},
createDatasetFilter('node_stats', 'elasticsearch.node_stats'),
createDatasetFilter('node_stats', 'node_stats', 'elasticsearch.node_stats'),
{
range: {
timestamp: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ describe('fetchDiskUsageNodeStats', () => {
bool: {
should: [
{ term: { type: 'node_stats' } },
{ term: { 'metricset.name': 'node_stats' } },
{ term: { 'data_stream.dataset': 'elasticsearch.node_stats' } },
],
minimum_should_match: 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export async function fetchDiskUsageNodeStats(
cluster_uuid: clustersIds,
},
},
createDatasetFilter('node_stats', 'elasticsearch.node_stats'),
createDatasetFilter('node_stats', 'node_stats', 'elasticsearch.node_stats'),
{
range: {
timestamp: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ describe('fetchElasticsearchVersions', () => {
'*:.monitoring-es-*,.monitoring-es-*,*:metrics-elasticsearch.cluster_stats-*,metrics-elasticsearch.cluster_stats-*',
filter_path: [
'hits.hits._source.cluster_stats.nodes.versions',
'hits.hits._source.elasticsearch.cluster.stats.nodes.versions',
'hits.hits._index',
'hits.hits._source.cluster_uuid',
'hits.hits._source.elasticsearch.cluster.id',
],
body: {
size: 1,
Expand All @@ -84,6 +86,7 @@ describe('fetchElasticsearchVersions', () => {
bool: {
should: [
{ term: { type: 'cluster_stats' } },
{ term: { 'metricset.name': 'cluster_stats' } },
{ term: { 'data_stream.dataset': 'elasticsearch.cluster_stats' } },
],
minimum_should_match: 1,
Expand Down
Loading