Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ecf3313
Make task manager maxWorkers and pollInterval observables (#75293)
mikecote Aug 21, 2020
afbedee
Merge with upstream
mikecote Aug 21, 2020
9d8393b
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Aug 21, 2020
a8dccf8
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Aug 24, 2020
a038e24
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Aug 31, 2020
71ab831
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 2, 2020
5a1f7ad
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 3, 2020
32468ad
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 8, 2020
d721fea
Make the task manager store emit error events (#75679)
mikecote Sep 9, 2020
2151dd9
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 9, 2020
e0a9fb5
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 14, 2020
5ba478d
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 16, 2020
f7ffe20
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 17, 2020
375d729
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 21, 2020
76c15a5
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 22, 2020
adc3f9f
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 24, 2020
36c717f
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 28, 2020
d989059
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 29, 2020
aa787b6
Temporarily apply back pressure to maxWorkers and pollInterval when 4…
mikecote Sep 30, 2020
3cd8145
Merge branch 'master' of github.com:elastic/kibana into feature/task_…
mikecote Sep 30, 2020
509542d
Fix merge conflicts
mikecote Oct 2, 2020
5f3847c
Merge branch 'master' into feature/task_manager_429
kibanamachine Oct 6, 2020
8c81a12
replace startsWith with a timer that is scheduled to 0
gmmorris Oct 8, 2020
574e523
Merge branch 'feature/task_manager_429' of github.com:elastic/kibana …
gmmorris Oct 8, 2020
423b8ad
typo
gmmorris Oct 8, 2020
e9dbd08
Merge branch 'master' into feature/task_manager_429
kibanamachine Oct 8, 2020
3fa42d8
Merge branch 'master' into feature/task_manager_429
kibanamachine Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { mockLogger } from '../test_utils';
import { TaskManager } from '../task_manager';
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
import {
SavedObjectsSerializer,
SavedObjectTypeRegistry,
SavedObjectsErrorHelpers,
} from '../../../../../src/core/server';
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';

describe('managed configuration', () => {
let taskManager: TaskManager;
let clock: sinon.SinonFakeTimers;
const callAsInternalUser = jest.fn();
const logger = mockLogger();
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
const savedObjectsClient = savedObjectsRepositoryMock.create();
const config = {
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
};

beforeEach(() => {
jest.resetAllMocks();
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
clock = sinon.useFakeTimers();
taskManager = new TaskManager({
config,
logger,
serializer,
callAsInternalUser,
taskManagerId: 'some-uuid',
savedObjectsRepository: savedObjectsClient,
});
taskManager.registerTaskDefinitions({
foo: {
type: 'foo',
title: 'Foo',
createTaskRunner: jest.fn(),
},
});
taskManager.start();
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
// sinon fake timers cause them to stall
clock.tick(0);
});

afterEach(() => clock.restore());

test('should lower max workers when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value');
});

test('should increase poll interval when Elasticsearch returns 429 error', async () => {
savedObjectsClient.create.mockRejectedValueOnce(
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
);
// Cause "too many requests" error to be thrown
await expect(
taskManager.schedule({
taskType: 'foo',
state: {},
params: {},
})
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
expect(logger.debug).toHaveBeenCalledWith(
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)'
);
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import sinon from 'sinon';
import { Subject } from 'rxjs';
import { mockLogger } from '../test_utils';
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
import {
createManagedConfiguration,
ADJUST_THROUGHPUT_INTERVAL,
} from './create_managed_configuration';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
const logger = mockLogger();

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('returns observables with initialized values', async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$: new Subject<Error>(),
startingMaxWorkers: 1,
startingPollInterval: 2,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
});

test(`skips errors that aren't about too many requests`, async () => {
const maxWorkersSubscription = jest.fn();
const pollIntervalSubscription = jest.fn();
const errors$ = new Subject<Error>();
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
errors$,
logger,
startingMaxWorkers: 100,
startingPollInterval: 100,
});
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
errors$.next(new Error('foo'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
});

describe('maxWorker configuration', () => {
function setupScenario(startingMaxWorkers: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { maxWorkersConfiguration$ } = createManagedConfiguration({
errors$,
startingMaxWorkers,
logger,
startingPollInterval: 1,
});
maxWorkersConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should decrease configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should increase configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 80);
expect(subscription).toHaveBeenNthCalledWith(3, 84);
// 88.2- > 89 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 89);
expect(subscription).toHaveBeenNthCalledWith(5, 94);
expect(subscription).toHaveBeenNthCalledWith(6, 99);
// 103.95 -> 100 from Math.min with starting value
expect(subscription).toHaveBeenNthCalledWith(7, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(7);
});

test('should keep reducing configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 20; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 80);
expect(subscription).toHaveBeenNthCalledWith(3, 64);
// 51.2 -> 51 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 51);
expect(subscription).toHaveBeenNthCalledWith(5, 40);
expect(subscription).toHaveBeenNthCalledWith(6, 32);
expect(subscription).toHaveBeenNthCalledWith(7, 25);
expect(subscription).toHaveBeenNthCalledWith(8, 20);
expect(subscription).toHaveBeenNthCalledWith(9, 16);
expect(subscription).toHaveBeenNthCalledWith(10, 12);
expect(subscription).toHaveBeenNthCalledWith(11, 9);
expect(subscription).toHaveBeenNthCalledWith(12, 7);
expect(subscription).toHaveBeenNthCalledWith(13, 5);
expect(subscription).toHaveBeenNthCalledWith(14, 4);
expect(subscription).toHaveBeenNthCalledWith(15, 3);
expect(subscription).toHaveBeenNthCalledWith(16, 2);
expect(subscription).toHaveBeenNthCalledWith(17, 1);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(17);
});
});

describe('pollInterval configuration', () => {
function setupScenario(startingPollInterval: number) {
const errors$ = new Subject<Error>();
const subscription = jest.fn();
const { pollIntervalConfiguration$ } = createManagedConfiguration({
logger,
errors$,
startingPollInterval,
startingMaxWorkers: 1,
});
pollIntervalConfiguration$.subscribe(subscription);
return { subscription, errors$ };
}

beforeEach(() => {
jest.resetAllMocks();
clock = sinon.useFakeTimers();
});

afterEach(() => clock.restore());

test('should increase configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
);
});

test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(100);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 114);
// 108.3 -> 108 from Math.floor
expect(subscription).toHaveBeenNthCalledWith(4, 108);
expect(subscription).toHaveBeenNthCalledWith(5, 102);
// 96.9 -> 100 from Math.max with the starting value
expect(subscription).toHaveBeenNthCalledWith(6, 100);
// No new calls due to value not changing and usage of distinctUntilChanged()
expect(subscription).toHaveBeenCalledTimes(6);
});

test('should increase configuration when errors keep emitting', async () => {
const { subscription, errors$ } = setupScenario(100);
for (let i = 0; i < 3; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(2, 120);
expect(subscription).toHaveBeenNthCalledWith(3, 144);
// 172.8 -> 173 from Math.ceil
expect(subscription).toHaveBeenNthCalledWith(4, 173);
});
});
});
Loading