Skip to content

Commit 57041a2

Browse files
committed
ensure workload agg doesnt run until next interval when it fails (#83632)
Ensures the WorkloadAggregator doesn't retry immediately after errors, and instead retries on the next interval.
1 parent 749c22b commit 57041a2

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,41 @@ describe('Workload Statistics Aggregator', () => {
477477
}, reject);
478478
});
479479
});
480+
481+
test('recovery after errors occurrs at the next interval', async () => {
482+
const refreshInterval = 1000;
483+
484+
const taskStore = taskStoreMock.create({});
485+
const logger = loggingSystemMock.create().get();
486+
const workloadAggregator = createWorkloadAggregator(
487+
taskStore,
488+
of(true),
489+
refreshInterval,
490+
3000,
491+
logger
492+
);
493+
494+
return new Promise((resolve, reject) => {
495+
let errorWasThrowAt = 0;
496+
taskStore.aggregate.mockImplementation(async () => {
497+
if (errorWasThrowAt === 0) {
498+
errorWasThrowAt = Date.now();
499+
throw new Error(`Elasticsearch has gone poof`);
500+
} else if (Date.now() - errorWasThrowAt < refreshInterval) {
501+
reject(new Error(`Elasticsearch is still poof`));
502+
}
503+
504+
return setTaskTypeCount(mockAggregatedResult(), 'alerting_telemetry', {
505+
idle: 2,
506+
});
507+
});
508+
509+
workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => {
510+
expect(results.length).toEqual(2);
511+
resolve();
512+
}, reject);
513+
});
514+
});
480515
});
481516

482517
describe('estimateRecurringTaskScheduling', () => {

x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import { combineLatest, Observable, timer } from 'rxjs';
8-
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
8+
import { mergeMap, map, filter, switchMap, catchError } from 'rxjs/operators';
99
import { Logger } from 'src/core/server';
1010
import { JsonObject } from 'src/plugins/kibana_utils/common';
1111
import { keyBy, mapValues } from 'lodash';
@@ -222,8 +222,8 @@ export function createWorkloadAggregator(
222222
}),
223223
catchError((ex: Error, caught) => {
224224
logger.error(`[WorkloadAggregator]: ${ex}`);
225-
// continue to pull values from the same observable
226-
return caught;
225+
// continue to pull values from the same observable but only on the next refreshInterval
226+
return timer(refreshInterval).pipe(switchMap(() => caught));
227227
})
228228
);
229229
}

0 commit comments

Comments
 (0)