66
77import { UMElasticsearchQueryFn } from '../adapters' ;
88import { Snapshot } from '../../../common/runtime_types' ;
9- import { QueryContext , MonitorGroupIterator } from './search' ;
9+ import { QueryContext } from './search' ;
1010import { CONTEXT_DEFAULTS , INDEX_NAMES } from '../../../common/constants' ;
1111
1212export interface GetSnapshotCountParams {
@@ -16,49 +16,6 @@ export interface GetSnapshotCountParams {
1616 statusFilter ?: string ;
1717}
1818
19- const fastStatusCount = async ( context : QueryContext ) : Promise < Snapshot > => {
20- const params = {
21- index : INDEX_NAMES . HEARTBEAT ,
22- body : {
23- size : 0 ,
24- query : { bool : { filter : await context . dateAndCustomFilters ( ) } } ,
25- aggs : {
26- unique : {
27- // We set the precision threshold to 40k which is the max precision supported by cardinality
28- cardinality : { field : 'monitor.id' , precision_threshold : 40000 } ,
29- } ,
30- down : {
31- filter : { range : { 'summary.down' : { gt : 0 } } } ,
32- aggs : {
33- unique : { cardinality : { field : 'monitor.id' , precision_threshold : 40000 } } ,
34- } ,
35- } ,
36- } ,
37- } ,
38- } ;
39-
40- const statistics = await context . search ( params ) ;
41- const total = statistics . aggregations . unique . value ;
42- const down = statistics . aggregations . down . unique . value ;
43-
44- return {
45- total,
46- down,
47- up : total - down ,
48- } ;
49- } ;
50-
51- const slowStatusCount = async ( context : QueryContext , status : string ) : Promise < number > => {
52- const downContext = context . clone ( ) ;
53- downContext . statusFilter = status ;
54- const iterator = new MonitorGroupIterator ( downContext ) ;
55- let count = 0 ;
56- while ( await iterator . next ( ) ) {
57- count ++ ;
58- }
59- return count ;
60- } ;
61-
6219export const getSnapshotCount : UMElasticsearchQueryFn < GetSnapshotCountParams , Snapshot > = async ( {
6320 callES,
6421 dateRangeStart,
@@ -81,22 +38,147 @@ export const getSnapshotCount: UMElasticsearchQueryFn<GetSnapshotCountParams, Sn
8138 ) ;
8239
8340 // Calculate the total, up, and down counts.
84- const counts = await fastStatusCount ( context ) ;
85-
86- // Check if the last count was accurate, if not, we need to perform a slower count with the
87- // MonitorGroupsIterator.
88- if ( ! ( await context . hasTimespan ( ) ) ) {
89- // Figure out whether 'up' or 'down' is more common. It's faster to count the lower cardinality
90- // one then use subtraction to figure out its opposite.
91- const [ leastCommonStatus , mostCommonStatus ] : Array < 'up' | 'down' > =
92- counts . up > counts . down ? [ 'down' , 'up' ] : [ 'up' , 'down' ] ;
93- counts [ leastCommonStatus ] = await slowStatusCount ( context , leastCommonStatus ) ;
94- counts [ mostCommonStatus ] = counts . total - counts [ leastCommonStatus ] ;
95- }
41+ const counts = await statusCount ( context ) ;
9642
9743 return {
9844 total : statusFilter ? counts [ statusFilter ] : counts . total ,
9945 up : statusFilter === 'down' ? 0 : counts . up ,
10046 down : statusFilter === 'up' ? 0 : counts . down ,
10147 } ;
10248} ;
49+
50+ const statusCount = async ( context : QueryContext ) : Promise < Snapshot > => {
51+ const res = await context . search ( {
52+ index : INDEX_NAMES . HEARTBEAT ,
53+ body : statusCountBody ( await context . dateAndCustomFilters ( ) ) ,
54+ } ) ;
55+
56+ return res . aggregations . counts . value ;
57+ } ;
58+
59+ const statusCountBody = ( filters : any ) : any => {
60+ return {
61+ size : 0 ,
62+ query : {
63+ bool : {
64+ filter : [
65+ {
66+ exists : {
67+ field : 'summary' ,
68+ } ,
69+ } ,
70+ filters ,
71+ ] ,
72+ } ,
73+ } ,
74+ aggs : {
75+ counts : {
76+ scripted_metric : {
77+ init_script : 'state.locStatus = new HashMap(); state.totalDocs = 0;' ,
78+ map_script : `
79+ def loc = doc["observer.geo.name"].size() == 0 ? "" : doc["observer.geo.name"][0];
80+
81+ // One concern here is memory since we could build pretty gigantic maps. I've opted to
82+ // stick to a simple <String,String> map to reduce memory overhead. This means we do
83+ // a little string parsing to treat these strings as records that stay lexicographically
84+ // sortable (which is important later).
85+ // We encode the ID and location as $id.len:$id$loc
86+ String id = doc["monitor.id"][0];
87+ String idLenDelim = Integer.toHexString(id.length()) + ":" + id;
88+ String idLoc = loc == null ? idLenDelim : idLenDelim + loc;
89+
90+ String status = doc["summary.down"][0] > 0 ? "d" : "u";
91+ String timeAndStatus = doc["@timestamp"][0].toInstant().toEpochMilli().toString() + status;
92+ state.locStatus[idLoc] = timeAndStatus;
93+ state.totalDocs++;
94+ ` ,
95+ combine_script : `
96+ return state;
97+ ` ,
98+ reduce_script : `
99+ // Use a treemap since it's traversable in sorted order.
100+ // This is important later.
101+ TreeMap locStatus = new TreeMap();
102+ long totalDocs = 0;
103+ int uniqueIds = 0;
104+ for (state in states) {
105+ totalDocs += state.totalDocs;
106+ for (entry in state.locStatus.entrySet()) {
107+ // Update the value for the given key if we have a more recent check from this location.
108+ locStatus.merge(entry.getKey(), entry.getValue(), (a,b) -> a.compareTo(b) > 0 ? a : b)
109+ }
110+ }
111+
112+ HashMap locTotals = new HashMap();
113+ int total = 0;
114+ int down = 0;
115+ String curId = "";
116+ boolean curIdDown = false;
117+ // We now iterate through our tree map in order, which means records for a given ID
118+ // always are encountered one after the other. This saves us having to make an intermediate
119+ // map.
120+ for (entry in locStatus.entrySet()) {
121+ String idLoc = entry.getKey();
122+ String timeStatus = entry.getValue();
123+
124+ // Parse the length delimited id/location strings described in the map section
125+ int colonIndex = idLoc.indexOf(":");
126+ int idEnd = Integer.parseInt(idLoc.substring(0, colonIndex), 16) + colonIndex + 1;
127+ String id = idLoc.substring(colonIndex + 1, idEnd);
128+ String loc = idLoc.substring(idEnd, idLoc.length());
129+ String status = timeStatus.substring(timeStatus.length() - 1);
130+
131+ // Here we increment counters for the up/down key per location
132+ // We also create a new hashmap in locTotals if we've never seen this location
133+ // before.
134+ locTotals.compute(loc, (k,v) -> {
135+ HashMap res = v;
136+ if (v == null) {
137+ res = new HashMap();
138+ res.put('up', 0);
139+ res.put('down', 0);
140+ }
141+
142+ if (status == 'u') {
143+ res.up++;
144+ } else {
145+ res.down++;
146+ }
147+
148+ return res;
149+ });
150+
151+
152+ // We've encountered a new ID
153+ if (curId != id) {
154+ total++;
155+ curId = id;
156+ if (status == "d") {
157+ curIdDown = true;
158+ down++;
159+ } else {
160+ curIdDown = false;
161+ }
162+ } else if (!curIdDown) {
163+ if (status == "d") {
164+ curIdDown = true;
165+ down++;
166+ } else {
167+ curIdDown = false;
168+ }
169+ }
170+ }
171+
172+ Map result = new HashMap();
173+ result.total = total;
174+ result.location_totals = locTotals;
175+ result.up = total - down;
176+ result.down = down;
177+ result.totalDocs = totalDocs;
178+ return result;
179+ ` ,
180+ } ,
181+ } ,
182+ } ,
183+ } ;
184+ } ;
0 commit comments