diff --git a/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts new file mode 100644 index 0000000000000..443c811469002 --- /dev/null +++ b/x-pack/plugins/task_manager/server/integration_tests/managed_configuration.test.ts @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { mockLogger } from '../test_utils'; +import { TaskManager } from '../task_manager'; +import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks'; +import { + SavedObjectsSerializer, + SavedObjectTypeRegistry, + SavedObjectsErrorHelpers, +} from '../../../../../src/core/server'; +import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration'; + +describe('managed configuration', () => { + let taskManager: TaskManager; + let clock: sinon.SinonFakeTimers; + const callAsInternalUser = jest.fn(); + const logger = mockLogger(); + const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry()); + const savedObjectsClient = savedObjectsRepositoryMock.create(); + const config = { + enabled: true, + max_workers: 10, + index: 'foo', + max_attempts: 9, + poll_interval: 3000, + max_poll_inactivity_cycles: 10, + request_capacity: 1000, + }; + + beforeEach(() => { + jest.resetAllMocks(); + callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 }); + clock = sinon.useFakeTimers(); + taskManager = new TaskManager({ + config, + logger, + serializer, + callAsInternalUser, + taskManagerId: 'some-uuid', + savedObjectsRepository: savedObjectsClient, + }); + taskManager.registerTaskDefinitions({ + foo: { + type: 'foo', + title: 'Foo', + createTaskRunner: jest.fn(), + }, + }); + taskManager.start(); + // force rxjs timers to fire when they are scheduled for setTimeout(0) as the + // sinon fake timers cause them to stall + clock.tick(0); + }); + + afterEach(() => clock.restore()); + + test('should lower max workers when Elasticsearch returns 429 error', async () => { + savedObjectsClient.create.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') + ); + // Cause "too many requests" error to be thrown + await expect( + taskManager.schedule({ + taskType: 'foo', + state: {}, + params: {}, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith( + 'Max workers configuration changing from 10 to 8 after seeing 1 error(s)' + ); + expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value'); + }); + + test('should increase poll interval when Elasticsearch returns 429 error', async () => { + savedObjectsClient.create.mockRejectedValueOnce( + SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b') + ); + // Cause "too many requests" error to be thrown + await expect( + taskManager.schedule({ + taskType: 'foo', + state: {}, + params: {}, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' + ); + expect(logger.debug).toHaveBeenCalledWith( + 'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)' + ); + expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms'); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts new file mode 100644 index 0000000000000..b6b5cd003c5d4 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -0,0 +1,213 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import sinon from 'sinon'; +import { Subject } from 'rxjs'; +import { mockLogger } from '../test_utils'; +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { + createManagedConfiguration, + ADJUST_THROUGHPUT_INTERVAL, +} from './create_managed_configuration'; + +describe('createManagedConfiguration()', () => { + let clock: sinon.SinonFakeTimers; + const logger = mockLogger(); + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('returns observables with initialized values', async () => { + const maxWorkersSubscription = jest.fn(); + const pollIntervalSubscription = jest.fn(); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + logger, + errors$: new Subject(), + startingMaxWorkers: 1, + startingPollInterval: 2, + }); + maxWorkersConfiguration$.subscribe(maxWorkersSubscription); + pollIntervalConfiguration$.subscribe(pollIntervalSubscription); + expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); + expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1); + expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); + expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2); + }); + + test(`skips errors that aren't about too many requests`, async () => { + const maxWorkersSubscription = jest.fn(); + const pollIntervalSubscription = jest.fn(); + const errors$ = new Subject(); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + errors$, + logger, + startingMaxWorkers: 100, + startingPollInterval: 100, + }); + maxWorkersConfiguration$.subscribe(maxWorkersSubscription); + pollIntervalConfiguration$.subscribe(pollIntervalSubscription); + errors$.next(new Error('foo')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(maxWorkersSubscription).toHaveBeenCalledTimes(1); + expect(pollIntervalSubscription).toHaveBeenCalledTimes(1); + }); + + describe('maxWorker configuration', () => { + function setupScenario(startingMaxWorkers: number) { + const errors$ = new Subject(); + const subscription = jest.fn(); + const { maxWorkersConfiguration$ } = createManagedConfiguration({ + errors$, + startingMaxWorkers, + logger, + startingPollInterval: 1, + }); + maxWorkersConfiguration$.subscribe(subscription); + return { subscription, errors$ }; + } + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('should decrease configuration at the next interval when an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 80); + }); + + test('should log a warning when the configuration changes from the starting value', async () => { + const { errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).' + ); + }); + + test('should increase configuration back to normal incrementally after an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); + expect(subscription).toHaveBeenNthCalledWith(2, 80); + expect(subscription).toHaveBeenNthCalledWith(3, 84); + // 88.2- > 89 from Math.ceil + expect(subscription).toHaveBeenNthCalledWith(4, 89); + expect(subscription).toHaveBeenNthCalledWith(5, 94); + expect(subscription).toHaveBeenNthCalledWith(6, 99); + // 103.95 -> 100 from Math.min with starting value + expect(subscription).toHaveBeenNthCalledWith(7, 100); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(7); + }); + + test('should keep reducing configuration when errors keep emitting', async () => { + const { subscription, errors$ } = setupScenario(100); + for (let i = 0; i < 20; i++) { + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + } + expect(subscription).toHaveBeenNthCalledWith(2, 80); + expect(subscription).toHaveBeenNthCalledWith(3, 64); + // 51.2 -> 51 from Math.floor + expect(subscription).toHaveBeenNthCalledWith(4, 51); + expect(subscription).toHaveBeenNthCalledWith(5, 40); + expect(subscription).toHaveBeenNthCalledWith(6, 32); + expect(subscription).toHaveBeenNthCalledWith(7, 25); + expect(subscription).toHaveBeenNthCalledWith(8, 20); + expect(subscription).toHaveBeenNthCalledWith(9, 16); + expect(subscription).toHaveBeenNthCalledWith(10, 12); + expect(subscription).toHaveBeenNthCalledWith(11, 9); + expect(subscription).toHaveBeenNthCalledWith(12, 7); + expect(subscription).toHaveBeenNthCalledWith(13, 5); + expect(subscription).toHaveBeenNthCalledWith(14, 4); + expect(subscription).toHaveBeenNthCalledWith(15, 3); + expect(subscription).toHaveBeenNthCalledWith(16, 2); + expect(subscription).toHaveBeenNthCalledWith(17, 1); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(17); + }); + }); + + describe('pollInterval configuration', () => { + function setupScenario(startingPollInterval: number) { + const errors$ = new Subject(); + const subscription = jest.fn(); + const { pollIntervalConfiguration$ } = createManagedConfiguration({ + logger, + errors$, + startingPollInterval, + startingMaxWorkers: 1, + }); + pollIntervalConfiguration$.subscribe(subscription); + return { subscription, errors$ }; + } + + beforeEach(() => { + jest.resetAllMocks(); + clock = sinon.useFakeTimers(); + }); + + afterEach(() => clock.restore()); + + test('should increase configuration at the next interval when an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 120); + }); + + test('should log a warning when the configuration changes from the starting value', async () => { + const { errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + expect(logger.warn).toHaveBeenCalledWith( + 'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).' + ); + }); + + test('should decrease configuration back to normal incrementally after an error is emitted', async () => { + const { subscription, errors$ } = setupScenario(100); + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); + expect(subscription).toHaveBeenNthCalledWith(2, 120); + expect(subscription).toHaveBeenNthCalledWith(3, 114); + // 108.3 -> 108 from Math.floor + expect(subscription).toHaveBeenNthCalledWith(4, 108); + expect(subscription).toHaveBeenNthCalledWith(5, 102); + // 96.9 -> 100 from Math.max with the starting value + expect(subscription).toHaveBeenNthCalledWith(6, 100); + // No new calls due to value not changing and usage of distinctUntilChanged() + expect(subscription).toHaveBeenCalledTimes(6); + }); + + test('should increase configuration when errors keep emitting', async () => { + const { subscription, errors$ } = setupScenario(100); + for (let i = 0; i < 3; i++) { + errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + clock.tick(ADJUST_THROUGHPUT_INTERVAL); + } + expect(subscription).toHaveBeenNthCalledWith(2, 120); + expect(subscription).toHaveBeenNthCalledWith(3, 144); + // 172.8 -> 173 from Math.ceil + expect(subscription).toHaveBeenNthCalledWith(4, 173); + }); + }); +}); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts new file mode 100644 index 0000000000000..3dc5fd50d3ca4 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { interval, merge, of, Observable } from 'rxjs'; +import { filter, mergeScan, map, scan, distinctUntilChanged, startWith } from 'rxjs/operators'; +import { SavedObjectsErrorHelpers } from '../../../../../src/core/server'; +import { Logger } from '../types'; + +const FLUSH_MARKER = Symbol('flush'); +export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; + +// When errors occur, reduce maxWorkers by MAX_WORKERS_DECREASE_PERCENTAGE +// When errors no longer occur, start increasing maxWorkers by MAX_WORKERS_INCREASE_PERCENTAGE +// until starting value is reached +const MAX_WORKERS_DECREASE_PERCENTAGE = 0.8; +const MAX_WORKERS_INCREASE_PERCENTAGE = 1.05; + +// When errors occur, increase pollInterval by POLL_INTERVAL_INCREASE_PERCENTAGE +// When errors no longer occur, start decreasing pollInterval by POLL_INTERVAL_DECREASE_PERCENTAGE +// until starting value is reached +const POLL_INTERVAL_DECREASE_PERCENTAGE = 0.95; +const POLL_INTERVAL_INCREASE_PERCENTAGE = 1.2; + +interface ManagedConfigurationOpts { + logger: Logger; + startingMaxWorkers: number; + startingPollInterval: number; + errors$: Observable; +} + +interface ManagedConfiguration { + maxWorkersConfiguration$: Observable; + pollIntervalConfiguration$: Observable; +} + +export function createManagedConfiguration({ + logger, + startingMaxWorkers, + startingPollInterval, + errors$, +}: ManagedConfigurationOpts): ManagedConfiguration { + const errorCheck$ = countErrors(errors$, ADJUST_THROUGHPUT_INTERVAL); + return { + maxWorkersConfiguration$: errorCheck$.pipe( + createMaxWorkersScan(logger, startingMaxWorkers), + startWith(startingMaxWorkers), + distinctUntilChanged() + ), + pollIntervalConfiguration$: errorCheck$.pipe( + createPollIntervalScan(logger, startingPollInterval), + startWith(startingPollInterval), + distinctUntilChanged() + ), + }; +} + +function createMaxWorkersScan(logger: Logger, startingMaxWorkers: number) { + return scan((previousMaxWorkers: number, errorCount: number) => { + let newMaxWorkers: number; + if (errorCount > 0) { + // Decrease max workers by MAX_WORKERS_DECREASE_PERCENTAGE while making sure it doesn't go lower than 1. + // Using Math.floor to make sure the number is different than previous while not being a decimal value. + newMaxWorkers = Math.max(Math.floor(previousMaxWorkers * MAX_WORKERS_DECREASE_PERCENTAGE), 1); + } else { + // Increase max workers by MAX_WORKERS_INCREASE_PERCENTAGE while making sure it doesn't go + // higher than the starting value. Using Math.ceil to make sure the number is different than + // previous while not being a decimal value + newMaxWorkers = Math.min( + startingMaxWorkers, + Math.ceil(previousMaxWorkers * MAX_WORKERS_INCREASE_PERCENTAGE) + ); + } + if (newMaxWorkers !== previousMaxWorkers) { + logger.debug( + `Max workers configuration changing from ${previousMaxWorkers} to ${newMaxWorkers} after seeing ${errorCount} error(s)` + ); + if (previousMaxWorkers === startingMaxWorkers) { + logger.warn( + `Max workers configuration is temporarily reduced after Elasticsearch returned ${errorCount} "too many request" error(s).` + ); + } + } + return newMaxWorkers; + }, startingMaxWorkers); +} + +function createPollIntervalScan(logger: Logger, startingPollInterval: number) { + return scan((previousPollInterval: number, errorCount: number) => { + let newPollInterval: number; + if (errorCount > 0) { + // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE); + } else { + // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.max( + startingPollInterval, + Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) + ); + } + if (newPollInterval !== previousPollInterval) { + logger.debug( + `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} error(s)` + ); + if (previousPollInterval === startingPollInterval) { + logger.warn( + `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" error(s).` + ); + } + } + return newPollInterval; + }, startingPollInterval); +} + +function countErrors(errors$: Observable, countInterval: number): Observable { + return merge( + // Flush error count at fixed interval + interval(countInterval).pipe(map(() => FLUSH_MARKER)), + errors$.pipe(filter((e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e))) + ).pipe( + // When tag is "flush", reset the error counter + // Otherwise increment the error counter + mergeScan(({ count }, next) => { + return next === FLUSH_MARKER + ? of(emitErrorCount(count), resetErrorCount()) + : of(incementErrorCount(count)); + }, emitErrorCount(0)), + filter(isEmitEvent), + map(({ count }) => count) + ); +} + +function emitErrorCount(count: number) { + return { + tag: 'emit', + count, + }; +} + +function isEmitEvent(event: { tag: string; count: number }) { + return event.tag === 'emit'; +} + +function incementErrorCount(count: number) { + return { + tag: 'inc', + count: count + 1, + }; +} + +function resetErrorCount() { + return { + tag: 'initial', + count: 0, + }; +} diff --git a/x-pack/plugins/task_manager/server/polling/observable_monitor.ts b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts index 7b06117ef59d1..b07bb6661163b 100644 --- a/x-pack/plugins/task_manager/server/polling/observable_monitor.ts +++ b/x-pack/plugins/task_manager/server/polling/observable_monitor.ts @@ -4,9 +4,9 @@ * you may not use this file except in compliance with the Elastic License. */ -import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs'; -import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators'; +import { Subject, Observable, throwError, timer, Subscription } from 'rxjs'; import { noop } from 'lodash'; +import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators'; const DEFAULT_HEARTBEAT_INTERVAL = 1000; @@ -29,7 +29,7 @@ export function createObservableMonitor( }: ObservableMonitorOptions = {} ): Observable { return new Observable((subscriber) => { - const subscription: Subscription = interval(heartbeatInterval) + const subscription: Subscription = timer(0, heartbeatInterval) .pipe( // switch from the heartbeat interval to the instantiated observable until it completes / errors exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)), diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts index 607e2ac2b80fa..956c8b05f3860 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.test.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.test.ts @@ -5,11 +5,11 @@ */ import _ from 'lodash'; -import { Subject } from 'rxjs'; +import { Subject, of, BehaviorSubject } from 'rxjs'; import { Option, none, some } from 'fp-ts/lib/Option'; import { createTaskPoller, PollingError, PollingErrorType } from './task_poller'; import { fakeSchedulers } from 'rxjs-marbles/jest'; -import { sleep, resolvable, Resolvable } from '../test_utils'; +import { sleep, resolvable, Resolvable, mockLogger } from '../test_utils'; import { asOk, asErr } from '../lib/result_type'; describe('TaskPoller', () => { @@ -24,10 +24,12 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, getCapacity: () => 1, work, + workTimeout: pollInterval * 5, pollRequests$: new Subject>(), }).subscribe(() => {}); @@ -40,9 +42,52 @@ describe('TaskPoller', () => { await sleep(0); expect(work).toHaveBeenCalledTimes(1); + await sleep(0); + await sleep(0); + advance(pollInterval + 10); + await sleep(0); + expect(work).toHaveBeenCalledTimes(2); + }) + ); + + test( + 'poller adapts to pollInterval changes', + fakeSchedulers(async (advance) => { + const pollInterval = 100; + const pollInterval$ = new BehaviorSubject(pollInterval); + const bufferCapacity = 5; + + const work = jest.fn(async () => true); + createTaskPoller({ + logger: mockLogger(), + pollInterval$, + bufferCapacity, + getCapacity: () => 1, + work, + workTimeout: pollInterval * 5, + pollRequests$: new Subject>(), + }).subscribe(() => {}); + + // `work` is async, we have to force a node `tick` await sleep(0); advance(pollInterval); + expect(work).toHaveBeenCalledTimes(1); + + pollInterval$.next(pollInterval * 2); + + // `work` is async, we have to force a node `tick` + await sleep(0); + advance(pollInterval); + expect(work).toHaveBeenCalledTimes(1); + advance(pollInterval); expect(work).toHaveBeenCalledTimes(2); + + pollInterval$.next(pollInterval / 2); + + // `work` is async, we have to force a node `tick` + await sleep(0); + advance(pollInterval / 2); + expect(work).toHaveBeenCalledTimes(3); }) ); @@ -56,9 +101,11 @@ describe('TaskPoller', () => { let hasCapacity = true; createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => (hasCapacity ? 1 : 0), pollRequests$: new Subject>(), }).subscribe(() => {}); @@ -113,9 +160,11 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => 1, pollRequests$, }).subscribe(jest.fn()); @@ -157,9 +206,11 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => (hasCapacity ? 1 : 0), pollRequests$, }).subscribe(() => {}); @@ -200,9 +251,11 @@ describe('TaskPoller', () => { const work = jest.fn(async () => true); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => 1, pollRequests$, }).subscribe(() => {}); @@ -235,7 +288,8 @@ describe('TaskPoller', () => { const handler = jest.fn(); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work: async (...args) => { await worker; @@ -285,7 +339,8 @@ describe('TaskPoller', () => { type ResolvableTupple = [string, PromiseLike & Resolvable]; const pollRequests$ = new Subject>(); createTaskPoller<[string, Resolvable], string[]>({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work: async (...resolvables) => { await Promise.all(resolvables.map(([, future]) => future)); @@ -344,11 +399,13 @@ describe('TaskPoller', () => { const handler = jest.fn(); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work: async (...args) => { throw new Error('failed to work'); }, + workTimeout: pollInterval * 5, getCapacity: () => 5, pollRequests$, }).subscribe(handler); @@ -383,9 +440,11 @@ describe('TaskPoller', () => { return callCount; }); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => 5, pollRequests$, }).subscribe(handler); @@ -424,9 +483,11 @@ describe('TaskPoller', () => { const work = jest.fn(async () => {}); const pollRequests$ = new Subject>(); createTaskPoller({ - pollInterval, + logger: mockLogger(), + pollInterval$: of(pollInterval), bufferCapacity, work, + workTimeout: pollInterval * 5, getCapacity: () => 5, pollRequests$, }).subscribe(handler); diff --git a/x-pack/plugins/task_manager/server/polling/task_poller.ts b/x-pack/plugins/task_manager/server/polling/task_poller.ts index a1435ffafe8f8..7515668a19d40 100644 --- a/x-pack/plugins/task_manager/server/polling/task_poller.ts +++ b/x-pack/plugins/task_manager/server/polling/task_poller.ts @@ -11,10 +11,11 @@ import { performance } from 'perf_hooks'; import { after } from 'lodash'; import { Subject, merge, interval, of, Observable } from 'rxjs'; -import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators'; +import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators'; import { pipe } from 'fp-ts/lib/pipeable'; import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; +import { Logger } from '../types'; import { pullFromSet } from '../lib/pull_from_set'; import { Result, @@ -30,12 +31,13 @@ import { timeoutPromiseAfter } from './timeout_promise_after'; type WorkFn = (...params: T[]) => Promise; interface Opts { - pollInterval: number; + logger: Logger; + pollInterval$: Observable; bufferCapacity: number; getCapacity: () => number; pollRequests$: Observable>; work: WorkFn; - workTimeout?: number; + workTimeout: number; } /** @@ -52,7 +54,8 @@ interface Opts { * of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$ */ export function createTaskPoller({ - pollInterval, + logger, + pollInterval$, getCapacity, pollRequests$, bufferCapacity, @@ -67,7 +70,13 @@ export function createTaskPoller({ // emit a polling event on demand pollRequests$, // emit a polling event on a fixed interval - interval(pollInterval).pipe(mapTo(none)) + pollInterval$.pipe( + switchMap((period) => { + logger.debug(`Task poller now using interval of ${period}ms`); + return interval(period); + }), + mapTo(none) + ) ).pipe( // buffer all requests in a single set (to remove duplicates) as we don't want // work to take place in parallel (it could cause Task Manager to pull in the same @@ -95,7 +104,7 @@ export function createTaskPoller({ await promiseResult( timeoutPromiseAfter( work(...pullFromSet(set, getCapacity())), - workTimeout ?? pollInterval, + workTimeout, () => new Error(`work has timed out`) ) ), diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index fb2d5e07030a4..cc611e124ea7b 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -17,6 +17,7 @@ import { ISavedObjectsRepository, } from '../../../../src/core/server'; import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; +import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskManagerConfig } from './config'; import { Logger } from './types'; @@ -149,6 +150,13 @@ export class TaskManager { // pipe store events into the TaskManager's event stream this.store.events.subscribe((event) => this.events$.next(event)); + const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({ + logger: this.logger, + errors$: this.store.errors$, + startingMaxWorkers: opts.config.max_workers, + startingPollInterval: opts.config.poll_interval, + }); + this.bufferedStore = new BufferedTaskStore(this.store, { bufferMaxOperations: opts.config.max_workers, logger: this.logger, @@ -156,7 +164,7 @@ export class TaskManager { this.pool = new TaskPool({ logger: this.logger, - maxWorkers: opts.config.max_workers, + maxWorkers$: maxWorkersConfiguration$, }); const { @@ -166,7 +174,8 @@ export class TaskManager { this.poller$ = createObservableMonitor>, Error>( () => createTaskPoller({ - pollInterval, + logger: this.logger, + pollInterval$: pollIntervalConfiguration$, bufferCapacity: opts.config.request_capacity, getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, diff --git a/x-pack/plugins/task_manager/server/task_pool.test.ts b/x-pack/plugins/task_manager/server/task_pool.test.ts index 8b2bce455589e..12b731b2b78ae 100644 --- a/x-pack/plugins/task_manager/server/task_pool.test.ts +++ b/x-pack/plugins/task_manager/server/task_pool.test.ts @@ -5,6 +5,7 @@ */ import sinon from 'sinon'; +import { of, Subject } from 'rxjs'; import { TaskPool, TaskPoolRunResult } from './task_pool'; import { mockLogger, resolvable, sleep } from './test_utils'; import { asOk } from './lib/result_type'; @@ -14,7 +15,7 @@ import moment from 'moment'; describe('TaskPool', () => { test('occupiedWorkers are a sum of running tasks', async () => { const pool = new TaskPool({ - maxWorkers: 200, + maxWorkers$: of(200), logger: mockLogger(), }); @@ -26,7 +27,7 @@ describe('TaskPool', () => { test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => { const pool = new TaskPool({ - maxWorkers: 10, + maxWorkers$: of(10), logger: mockLogger(), }); @@ -36,9 +37,21 @@ describe('TaskPool', () => { expect(pool.availableWorkers).toEqual(7); }); + test('availableWorkers is 0 until maxWorkers$ pushes a value', async () => { + const maxWorkers$ = new Subject(); + const pool = new TaskPool({ + maxWorkers$, + logger: mockLogger(), + }); + + expect(pool.availableWorkers).toEqual(0); + maxWorkers$.next(10); + expect(pool.availableWorkers).toEqual(10); + }); + test('does not run tasks that are beyond its available capacity', async () => { const pool = new TaskPool({ - maxWorkers: 2, + maxWorkers$: of(2), logger: mockLogger(), }); @@ -60,7 +73,7 @@ describe('TaskPool', () => { test('should log when marking a Task as running fails', async () => { const logger = mockLogger(); const pool = new TaskPool({ - maxWorkers: 2, + maxWorkers$: of(2), logger, }); @@ -83,7 +96,7 @@ describe('TaskPool', () => { test('should log when running a Task fails', async () => { const logger = mockLogger(); const pool = new TaskPool({ - maxWorkers: 3, + maxWorkers$: of(3), logger, }); @@ -106,7 +119,7 @@ describe('TaskPool', () => { test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => { const logger = mockLogger(); const pool = new TaskPool({ - maxWorkers: 3, + maxWorkers$: of(3), logger, }); @@ -117,11 +130,9 @@ describe('TaskPool', () => { const result = await pool.run([mockTask(), taskFailedToRun, mockTask()]); - expect(logger.debug.mock.calls[0]).toMatchInlineSnapshot(` - Array [ - "Task TaskType \\"shooooo\\" failed in attempt to run: Saved object [task/foo] not found", - ] - `); + expect(logger.debug).toHaveBeenCalledWith( + 'Task TaskType "shooooo" failed in attempt to run: Saved object [task/foo] not found' + ); expect(logger.warn).not.toHaveBeenCalled(); expect(result).toEqual(TaskPoolRunResult.RunningAllClaimedTasks); @@ -130,7 +141,7 @@ describe('TaskPool', () => { test('Running a task which fails still takes up capacity', async () => { const logger = mockLogger(); const pool = new TaskPool({ - maxWorkers: 1, + maxWorkers$: of(1), logger, }); @@ -147,7 +158,7 @@ describe('TaskPool', () => { test('clears up capacity when a task completes', async () => { const pool = new TaskPool({ - maxWorkers: 1, + maxWorkers$: of(1), logger: mockLogger(), }); @@ -193,7 +204,7 @@ describe('TaskPool', () => { test('run cancels expired tasks prior to running new tasks', async () => { const logger = mockLogger(); const pool = new TaskPool({ - maxWorkers: 2, + maxWorkers$: of(2), logger, }); @@ -251,7 +262,7 @@ describe('TaskPool', () => { const logger = mockLogger(); const pool = new TaskPool({ logger, - maxWorkers: 20, + maxWorkers$: of(20), }); const cancelled = resolvable(); diff --git a/x-pack/plugins/task_manager/server/task_pool.ts b/x-pack/plugins/task_manager/server/task_pool.ts index 92374908c60f7..44f5f5648c2ac 100644 --- a/x-pack/plugins/task_manager/server/task_pool.ts +++ b/x-pack/plugins/task_manager/server/task_pool.ts @@ -8,6 +8,7 @@ * This module contains the logic that ensures we don't run too many * tasks at once in a given Kibana instance. */ +import { Observable } from 'rxjs'; import moment, { Duration } from 'moment'; import { performance } from 'perf_hooks'; import { padStart } from 'lodash'; @@ -16,7 +17,7 @@ import { TaskRunner } from './task_runner'; import { isTaskSavedObjectNotFoundError } from './lib/is_task_not_found_error'; interface Opts { - maxWorkers: number; + maxWorkers$: Observable; logger: Logger; } @@ -31,7 +32,7 @@ const VERSION_CONFLICT_MESSAGE = 'Task has been claimed by another Kibana servic * Runs tasks in batches, taking costs into account. */ export class TaskPool { - private maxWorkers: number; + private maxWorkers: number = 0; private running = new Set(); private logger: Logger; @@ -44,8 +45,11 @@ export class TaskPool { * @prop {Logger} logger - The task manager logger. */ constructor(opts: Opts) { - this.maxWorkers = opts.maxWorkers; this.logger = opts.logger; + opts.maxWorkers$.subscribe((maxWorkers) => { + this.logger.debug(`Task pool now using ${maxWorkers} as the max worker value`); + this.maxWorkers = maxWorkers; + }); } /** diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index f5fafe83748d9..5a3ee12d593c9 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -7,7 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; -import { filter, take } from 'rxjs/operators'; +import { filter, take, first } from 'rxjs/operators'; import { Option, some, none } from 'fp-ts/lib/Option'; import { @@ -66,8 +66,21 @@ const mockedDate = new Date('2019-02-12T21:01:22.479Z'); describe('TaskStore', () => { describe('schedule', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + async function testSchedule(task: unknown) { - const callCluster = jest.fn(); savedObjectsClient.create.mockImplementation(async (type: string, attributes: unknown) => ({ id: 'testid', type, @@ -75,15 +88,6 @@ describe('TaskStore', () => { references: [], version: '123', })); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster, - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); const result = await store.schedule(task as TaskInstance); expect(savedObjectsClient.create).toHaveBeenCalledTimes(1); @@ -176,12 +180,28 @@ describe('TaskStore', () => { /Unsupported task type "nope"/i ); }); + + test('pushes error from saved objects client to errors$', async () => { + const task: TaskInstance = { + id: 'id', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.create.mockRejectedValue(new Error('Failure')); + await expect(store.schedule(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('fetch', () => { - async function testFetch(opts?: SearchOpts, hits: unknown[] = []) { - const callCluster = sinon.spy(async (name: string, params?: unknown) => ({ hits: { hits } })); - const store = new TaskStore({ + let store: TaskStore; + const callCluster = jest.fn(); + + beforeAll(() => { + store = new TaskStore({ index: 'tasky', taskManagerId: '', serializer, @@ -190,15 +210,19 @@ describe('TaskStore', () => { definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); + }); + + async function testFetch(opts?: SearchOpts, hits: unknown[] = []) { + callCluster.mockResolvedValue({ hits: { hits } }); const result = await store.fetch(opts); - sinon.assert.calledOnce(callCluster); - sinon.assert.calledWith(callCluster, 'search'); + expect(callCluster).toHaveBeenCalledTimes(1); + expect(callCluster).toHaveBeenCalledWith('search', expect.anything()); return { result, - args: callCluster.args[0][1], + args: callCluster.mock.calls[0][1], }; } @@ -230,6 +254,13 @@ describe('TaskStore', () => { }, }); }); + + test('pushes error from call cluster to errors$', async () => { + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + callCluster.mockRejectedValue(new Error('Failure')); + await expect(store.fetch()).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('claimAvailableTasks', () => { @@ -928,9 +959,46 @@ if (doc['task.runAt'].size()!=0) { }, ]); }); + + test('pushes error from saved objects client to errors$', async () => { + const callCluster = jest.fn(); + const store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster, + definitions: taskDefinitions, + maxAttempts: 2, + savedObjectsRepository: savedObjectsClient, + }); + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + callCluster.mockRejectedValue(new Error('Failure')); + await expect( + store.claimAvailableTasks({ + claimOwnershipUntil: new Date(), + size: 10, + }) + ).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('update', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + test('refreshes the index, handles versioning', async () => { const task = { runAt: mockedDate, @@ -959,16 +1027,6 @@ if (doc['task.runAt'].size()!=0) { } ); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster: jest.fn(), - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); - const result = await store.update(task); expect(savedObjectsClient.update).toHaveBeenCalledWith( @@ -1002,28 +1060,116 @@ if (doc['task.runAt'].size()!=0) { version: '123', }); }); + + test('pushes error from saved objects client to errors$', async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.update.mockRejectedValue(new Error('Failure')); + await expect(store.update(task)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); + }); + + describe('bulkUpdate', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + + test('pushes error from saved objects client to errors$', async () => { + const task = { + runAt: mockedDate, + scheduledAt: mockedDate, + startedAt: null, + retryAt: null, + id: 'task:324242', + params: { hello: 'world' }, + state: { foo: 'bar' }, + taskType: 'report', + attempts: 3, + status: 'idle' as TaskStatus, + version: '123', + ownerId: null, + }; + + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.bulkUpdate.mockRejectedValue(new Error('Failure')); + await expect(store.bulkUpdate([task])).rejects.toThrowErrorMatchingInlineSnapshot( + `"Failure"` + ); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('remove', () => { - test('removes the task with the specified id', async () => { - const id = `id-${_.random(1, 20)}`; - const callCluster = jest.fn(); - const store = new TaskStore({ + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ index: 'tasky', taskManagerId: '', serializer, - callCluster, + callCluster: jest.fn(), maxAttempts: 2, definitions: taskDefinitions, savedObjectsRepository: savedObjectsClient, }); + }); + + test('removes the task with the specified id', async () => { + const id = `id-${_.random(1, 20)}`; const result = await store.remove(id); expect(result).toBeUndefined(); expect(savedObjectsClient.delete).toHaveBeenCalledWith('task', id); }); + + test('pushes error from saved objects client to errors$', async () => { + const id = `id-${_.random(1, 20)}`; + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.delete.mockRejectedValue(new Error('Failure')); + await expect(store.remove(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('get', () => { + let store: TaskStore; + + beforeAll(() => { + store = new TaskStore({ + index: 'tasky', + taskManagerId: '', + serializer, + callCluster: jest.fn(), + maxAttempts: 2, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + }); + }); + test('gets the task with the specified id', async () => { const id = `id-${_.random(1, 20)}`; const task = { @@ -1041,7 +1187,6 @@ if (doc['task.runAt'].size()!=0) { ownerId: null, }; - const callCluster = jest.fn(); savedObjectsClient.get.mockImplementation(async (type: string, objectId: string) => ({ id: objectId, type, @@ -1053,22 +1198,20 @@ if (doc['task.runAt'].size()!=0) { version: '123', })); - const store = new TaskStore({ - index: 'tasky', - taskManagerId: '', - serializer, - callCluster, - maxAttempts: 2, - definitions: taskDefinitions, - savedObjectsRepository: savedObjectsClient, - }); - const result = await store.get(id); expect(result).toEqual(task); expect(savedObjectsClient.get).toHaveBeenCalledWith('task', id); }); + + test('pushes error from saved objects client to errors$', async () => { + const id = `id-${_.random(1, 20)}`; + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + savedObjectsClient.get.mockRejectedValue(new Error('Failure')); + await expect(store.get(id)).rejects.toThrowErrorMatchingInlineSnapshot(`"Failure"`); + expect(await firstErrorPromise).toMatchInlineSnapshot(`[Error: Failure]`); + }); }); describe('getLifecycle', () => { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index acd19bd75f7a3..15261be3d89ae 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -121,6 +121,7 @@ export class TaskStore { public readonly maxAttempts: number; public readonly index: string; public readonly taskManagerId: string; + public readonly errors$ = new Subject(); private callCluster: ElasticJs; private definitions: TaskDictionary; @@ -171,11 +172,17 @@ export class TaskStore { ); } - const savedObject = await this.savedObjectsRepository.create( - 'task', - taskInstanceToAttributes(taskInstance), - { id: taskInstance.id, refresh: false } - ); + let savedObject; + try { + savedObject = await this.savedObjectsRepository.create( + 'task', + taskInstanceToAttributes(taskInstance), + { id: taskInstance.id, refresh: false } + ); + } catch (e) { + this.errors$.next(e); + throw e; + } return savedObjectToConcreteTaskInstance(savedObject); } @@ -333,12 +340,22 @@ export class TaskStore { */ public async update(doc: ConcreteTaskInstance): Promise { const attributes = taskInstanceToAttributes(doc); - const updatedSavedObject = await this.savedObjectsRepository.update< - SerializedConcreteTaskInstance - >('task', doc.id, attributes, { - refresh: false, - version: doc.version, - }); + + let updatedSavedObject; + try { + updatedSavedObject = await this.savedObjectsRepository.update( + 'task', + doc.id, + attributes, + { + refresh: false, + version: doc.version, + } + ); + } catch (e) { + this.errors$.next(e); + throw e; + } return savedObjectToConcreteTaskInstance( // The SavedObjects update api forces a Partial on the `attributes` on the response, @@ -362,8 +379,11 @@ export class TaskStore { return attrsById; }, new Map()); - const updatedSavedObjects: Array = ( - await this.savedObjectsRepository.bulkUpdate( + let updatedSavedObjects: Array; + try { + ({ saved_objects: updatedSavedObjects } = await this.savedObjectsRepository.bulkUpdate< + SerializedConcreteTaskInstance + >( docs.map((doc) => ({ type: 'task', id: doc.id, @@ -373,8 +393,11 @@ export class TaskStore { { refresh: false, } - ) - ).saved_objects; + )); + } catch (e) { + this.errors$.next(e); + throw e; + } return updatedSavedObjects.map((updatedSavedObject, index) => isSavedObjectsUpdateResponse(updatedSavedObject) @@ -404,7 +427,12 @@ export class TaskStore { * @returns {Promise} */ public async remove(id: string): Promise { - await this.savedObjectsRepository.delete('task', id); + try { + await this.savedObjectsRepository.delete('task', id); + } catch (e) { + this.errors$.next(e); + throw e; + } } /** @@ -414,7 +442,14 @@ export class TaskStore { * @returns {Promise} */ public async get(id: string): Promise { - return savedObjectToConcreteTaskInstance(await this.savedObjectsRepository.get('task', id)); + let result; + try { + result = await this.savedObjectsRepository.get('task', id); + } catch (e) { + this.errors$.next(e); + throw e; + } + return savedObjectToConcreteTaskInstance(result); } /** @@ -438,14 +473,20 @@ export class TaskStore { private async search(opts: SearchOpts = {}): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); - const result = await this.callCluster('search', { - index: this.index, - ignoreUnavailable: true, - body: { - ...opts, - query, - }, - }); + let result; + try { + result = await this.callCluster('search', { + index: this.index, + ignoreUnavailable: true, + body: { + ...opts, + query, + }, + }); + } catch (e) { + this.errors$.next(e); + throw e; + } const rawDocs = (result as SearchResponse).hits.hits; @@ -464,17 +505,23 @@ export class TaskStore { { max_docs }: UpdateByQueryOpts = {} ): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); - const result = await this.callCluster('updateByQuery', { - index: this.index, - ignoreUnavailable: true, - refresh: true, - max_docs, - conflicts: 'proceed', - body: { - ...opts, - query, - }, - }); + let result; + try { + result = await this.callCluster('updateByQuery', { + index: this.index, + ignoreUnavailable: true, + refresh: true, + max_docs, + conflicts: 'proceed', + body: { + ...opts, + query, + }, + }); + } catch (e) { + this.errors$.next(e); + throw e; + } // eslint-disable-next-line @typescript-eslint/naming-convention const { total, updated, version_conflicts } = result as UpdateDocumentByQueryResponse;