@@ -66,18 +66,7 @@ export const elasticsearchMonitorStatesAdapter: UMMonitorStatesAdapter = {
6666 ) ;
6767
6868 // Calculate the total, up, and down counts.
69- const counts = await fastStatusCount ( context ) ;
70-
71- // Check if the last count was accurate, if not, we need to perform a slower count with the
72- // MonitorGroupsIterator.
73- if ( ! ( await context . hasTimespan ( ) ) ) {
74- // Figure out whether 'up' or 'down' is more common. It's faster to count the lower cardinality
75- // one then use subtraction to figure out its opposite.
76- const [ leastCommonStatus , mostCommonStatus ] : Array < 'up' | 'down' > =
77- counts . up > counts . down ? [ 'down' , 'up' ] : [ 'up' , 'down' ] ;
78- counts [ leastCommonStatus ] = await slowStatusCount ( context , leastCommonStatus ) ;
79- counts [ mostCommonStatus ] = counts . total - counts [ leastCommonStatus ] ;
80- }
69+ const counts = await statusCount ( context ) ;
8170
8271 return {
8372 total : statusFilter ? counts [ statusFilter ] : counts . total ,
@@ -110,45 +99,122 @@ const jsonifyPagination = (p: any): string | null => {
11099 return JSON . stringify ( p ) ;
111100} ;
112101
113- const fastStatusCount = async ( context : QueryContext ) : Promise < Snapshot > => {
114- const params = {
102+ const statusCount = async ( context : QueryContext ) : Promise < Snapshot > => {
103+ const res = await context . search ( {
115104 index : INDEX_NAMES . HEARTBEAT ,
116- body : {
117- size : 0 ,
118- query : { bool : { filter : await context . dateAndCustomFilters ( ) } } ,
119- aggs : {
120- unique : {
121- // We set the precision threshold to 40k which is the max precision supported by cardinality
122- cardinality : { field : 'monitor.id' , precision_threshold : 40000 } ,
123- } ,
124- down : {
125- filter : { range : { 'summary.down' : { gt : 0 } } } ,
126- aggs : {
127- unique : { cardinality : { field : 'monitor.id' , precision_threshold : 40000 } } ,
128- } ,
129- } ,
130- } ,
131- } ,
132- } ;
105+ body : statusCountBody ( await context . dateAndCustomFilters ( ) ) ,
106+ } )
133107
134- const statistics = await context . search ( params ) ;
135- const total = statistics . aggregations . unique . value ;
136- const down = statistics . aggregations . down . unique . value ;
108+ console . log ( "RES" , res ) ;
137109
138- return {
139- total,
140- down,
141- up : total - down ,
142- } ;
143- } ;
110+ return res . aggregations . counts . value ;
111+ }
144112
145- const slowStatusCount = async ( context : QueryContext , status : string ) : Promise < number > => {
146- const downContext = context . clone ( ) ;
147- downContext . statusFilter = status ;
148- const iterator = new MonitorGroupIterator ( downContext ) ;
149- let count = 0 ;
150- while ( await iterator . next ( ) ) {
151- count ++ ;
113+ const statusCountBody = ( filters : any ) : any => {
114+ return {
115+ size : 0 ,
116+ query : {
117+ bool : {
118+ filter : [
119+ {
120+ exists : {
121+ field : "summary"
122+ }
123+ } ,
124+ filters ,
125+ ]
126+ }
127+ } ,
128+ aggs : {
129+ counts : {
130+ scripted_metric : {
131+ init_script : "state.locStatus = new HashMap(); state.totalDocs = 0;" ,
132+ map_script : `
133+ def loc = doc["observer.geo.name"];
134+ String idLoc = loc == null ? doc["monitor.id"][0] + "$" : doc["monitor.id"][0] + "$" + loc[0];
135+
136+ String status = doc["summary.down"][0] > 0 ? "d" : "u";
137+ String timeAndStatus = doc["@timestamp"][0].toInstant().toEpochMilli().toString() + status;
138+ state.locStatus[idLoc] = timeAndStatus;
139+ state.totalDocs++;
140+ ` ,
141+ combine_script : `
142+ return state;
143+ ` ,
144+ reduce_script : `
145+ // Use a treemap since it's later traversable in sorted order
146+ TreeMap locStatus = new TreeMap();
147+ long totalDocs = 0;
148+ int uniqueIds = 0;
149+ for (state in states) {
150+ totalDocs += state.totalDocs;
151+ for (entry in state.locStatus.entrySet()) {
152+ locStatus.merge(entry.getKey(), entry.getValue(), (a,b) -> a.compareTo(b) > 0 ? a : b)
153+ }
154+ }
155+
156+
157+ HashMap locTotals = new HashMap();
158+ int total = 0;
159+ int down = 0;
160+ String curId = "";
161+ boolean curIdDown = false;
162+ for (entry in locStatus.entrySet()) {
163+ String idLoc = entry.getKey();
164+ String timeStatus = entry.getValue();
165+ int splitAt = idLoc.lastIndexOf("$");
166+ String id = idLoc.substring(0, splitAt);
167+ String loc = idLoc.substring(splitAt+1);
168+ String status = timeStatus.substring(timeStatus.length() - 1);
169+
170+ locTotals.compute(loc, (k,v) -> {
171+ HashMap res = v;
172+ if (v == null) {
173+ res = new HashMap();
174+ res.put('up', 0);
175+ res.put('down', 0);
176+ }
177+
178+ if (status == 'u') {
179+ res.up++;
180+ } else {
181+ res.down++;
182+ }
183+
184+ return res;
185+ });
186+
187+
188+ // We've encountered a new ID
189+ if (curId != id) {
190+ total++;
191+ curId = id;
192+ if (status == "d") {
193+ curIdDown = true;
194+ down++;
195+ } else {
196+ curIdDown = false;
197+ }
198+ } else if (!curIdDown) {
199+ if (status == "d") {
200+ curIdDown = true;
201+ down++;
202+ } else {
203+ curIdDown = false;
204+ }
205+ }
206+ }
207+
208+ Map result = new HashMap();
209+ result.total = total;
210+ result.location_totals = locTotals;
211+ result.up = total-down;
212+ result.down = down;
213+ result.totalDocs = totalDocs;
214+ return result;
215+ `
216+ }
217+ }
218+ }
152219 }
153- return count ;
154- } ;
220+ }
0 commit comments