Skip to content

Commit c6f9059

Browse files
mikecotekibanamachinegmmorris
committed
Apply back pressure in Task Manager whenever Elasticsearch responds with a 429 (#75666)
* Make task manager maxWorkers and pollInterval observables (#75293) * WIP step 1 * WIP step 2 * Cleanup * Make maxWorkers an observable for the task pool * Cleanup * Fix test failures * Use BehaviorSubject * Add some tests * Make the task manager store emit error events (#75679) * Add errors$ observable to the task store * Add unit tests * Temporarily apply back pressure to maxWorkers and pollInterval when 429 errors occur (#77096) * WIP * Cleanup * Add error count to message * Reset observable values on stop * Add comments * Fix issues when changing configurations * Cleanup code * Cleanup pt2 * Some renames * Fix typecheck * Use observables to manage throughput * Rename class * Switch to createManagedConfiguration * Add some comments * Start unit tests * Add logs * Fix log level * Attempt at adding integration tests * Fix test failures * Fix timer * Revert "Fix timer" This reverts commit 0817e5e. * Use Symbol * Fix merge scan * replace startsWith with a timer that is scheduled to 0 * typo Co-authored-by: Kibana Machine <[email protected]> Co-authored-by: Gidi Meir Morris <[email protected]>
1 parent 867232b commit c6f9059

File tree

11 files changed

+882
-120
lines changed

11 files changed

+882
-120
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import sinon from 'sinon';
8+
import { mockLogger } from '../test_utils';
9+
import { TaskManager } from '../task_manager';
10+
import { savedObjectsRepositoryMock } from '../../../../../src/core/server/mocks';
11+
import {
12+
SavedObjectsSerializer,
13+
SavedObjectTypeRegistry,
14+
SavedObjectsErrorHelpers,
15+
} from '../../../../../src/core/server';
16+
import { ADJUST_THROUGHPUT_INTERVAL } from '../lib/create_managed_configuration';
17+
18+
describe('managed configuration', () => {
19+
let taskManager: TaskManager;
20+
let clock: sinon.SinonFakeTimers;
21+
const callAsInternalUser = jest.fn();
22+
const logger = mockLogger();
23+
const serializer = new SavedObjectsSerializer(new SavedObjectTypeRegistry());
24+
const savedObjectsClient = savedObjectsRepositoryMock.create();
25+
const config = {
26+
enabled: true,
27+
max_workers: 10,
28+
index: 'foo',
29+
max_attempts: 9,
30+
poll_interval: 3000,
31+
max_poll_inactivity_cycles: 10,
32+
request_capacity: 1000,
33+
};
34+
35+
beforeEach(() => {
36+
jest.resetAllMocks();
37+
callAsInternalUser.mockResolvedValue({ total: 0, updated: 0, version_conflicts: 0 });
38+
clock = sinon.useFakeTimers();
39+
taskManager = new TaskManager({
40+
config,
41+
logger,
42+
serializer,
43+
callAsInternalUser,
44+
taskManagerId: 'some-uuid',
45+
savedObjectsRepository: savedObjectsClient,
46+
});
47+
taskManager.registerTaskDefinitions({
48+
foo: {
49+
type: 'foo',
50+
title: 'Foo',
51+
createTaskRunner: jest.fn(),
52+
},
53+
});
54+
taskManager.start();
55+
// force rxjs timers to fire when they are scheduled for setTimeout(0) as the
56+
// sinon fake timers cause them to stall
57+
clock.tick(0);
58+
});
59+
60+
afterEach(() => clock.restore());
61+
62+
test('should lower max workers when Elasticsearch returns 429 error', async () => {
63+
savedObjectsClient.create.mockRejectedValueOnce(
64+
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
65+
);
66+
// Cause "too many requests" error to be thrown
67+
await expect(
68+
taskManager.schedule({
69+
taskType: 'foo',
70+
state: {},
71+
params: {},
72+
})
73+
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
74+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
75+
expect(logger.warn).toHaveBeenCalledWith(
76+
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
77+
);
78+
expect(logger.debug).toHaveBeenCalledWith(
79+
'Max workers configuration changing from 10 to 8 after seeing 1 error(s)'
80+
);
81+
expect(logger.debug).toHaveBeenCalledWith('Task pool now using 10 as the max worker value');
82+
});
83+
84+
test('should increase poll interval when Elasticsearch returns 429 error', async () => {
85+
savedObjectsClient.create.mockRejectedValueOnce(
86+
SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')
87+
);
88+
// Cause "too many requests" error to be thrown
89+
await expect(
90+
taskManager.schedule({
91+
taskType: 'foo',
92+
state: {},
93+
params: {},
94+
})
95+
).rejects.toThrowErrorMatchingInlineSnapshot(`"Too Many Requests"`);
96+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
97+
expect(logger.warn).toHaveBeenCalledWith(
98+
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
99+
);
100+
expect(logger.debug).toHaveBeenCalledWith(
101+
'Poll interval configuration changing from 3000 to 3600 after seeing 1 error(s)'
102+
);
103+
expect(logger.debug).toHaveBeenCalledWith('Task poller now using interval of 3600ms');
104+
});
105+
});
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
import sinon from 'sinon';
8+
import { Subject } from 'rxjs';
9+
import { mockLogger } from '../test_utils';
10+
import { SavedObjectsErrorHelpers } from '../../../../../src/core/server';
11+
import {
12+
createManagedConfiguration,
13+
ADJUST_THROUGHPUT_INTERVAL,
14+
} from './create_managed_configuration';
15+
16+
describe('createManagedConfiguration()', () => {
17+
let clock: sinon.SinonFakeTimers;
18+
const logger = mockLogger();
19+
20+
beforeEach(() => {
21+
jest.resetAllMocks();
22+
clock = sinon.useFakeTimers();
23+
});
24+
25+
afterEach(() => clock.restore());
26+
27+
test('returns observables with initialized values', async () => {
28+
const maxWorkersSubscription = jest.fn();
29+
const pollIntervalSubscription = jest.fn();
30+
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
31+
logger,
32+
errors$: new Subject<Error>(),
33+
startingMaxWorkers: 1,
34+
startingPollInterval: 2,
35+
});
36+
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
37+
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
38+
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
39+
expect(maxWorkersSubscription).toHaveBeenNthCalledWith(1, 1);
40+
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
41+
expect(pollIntervalSubscription).toHaveBeenNthCalledWith(1, 2);
42+
});
43+
44+
test(`skips errors that aren't about too many requests`, async () => {
45+
const maxWorkersSubscription = jest.fn();
46+
const pollIntervalSubscription = jest.fn();
47+
const errors$ = new Subject<Error>();
48+
const { maxWorkersConfiguration$, pollIntervalConfiguration$ } = createManagedConfiguration({
49+
errors$,
50+
logger,
51+
startingMaxWorkers: 100,
52+
startingPollInterval: 100,
53+
});
54+
maxWorkersConfiguration$.subscribe(maxWorkersSubscription);
55+
pollIntervalConfiguration$.subscribe(pollIntervalSubscription);
56+
errors$.next(new Error('foo'));
57+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
58+
expect(maxWorkersSubscription).toHaveBeenCalledTimes(1);
59+
expect(pollIntervalSubscription).toHaveBeenCalledTimes(1);
60+
});
61+
62+
describe('maxWorker configuration', () => {
63+
function setupScenario(startingMaxWorkers: number) {
64+
const errors$ = new Subject<Error>();
65+
const subscription = jest.fn();
66+
const { maxWorkersConfiguration$ } = createManagedConfiguration({
67+
errors$,
68+
startingMaxWorkers,
69+
logger,
70+
startingPollInterval: 1,
71+
});
72+
maxWorkersConfiguration$.subscribe(subscription);
73+
return { subscription, errors$ };
74+
}
75+
76+
beforeEach(() => {
77+
jest.resetAllMocks();
78+
clock = sinon.useFakeTimers();
79+
});
80+
81+
afterEach(() => clock.restore());
82+
83+
test('should decrease configuration at the next interval when an error is emitted', async () => {
84+
const { subscription, errors$ } = setupScenario(100);
85+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
86+
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
87+
expect(subscription).toHaveBeenCalledTimes(1);
88+
clock.tick(1);
89+
expect(subscription).toHaveBeenCalledTimes(2);
90+
expect(subscription).toHaveBeenNthCalledWith(2, 80);
91+
});
92+
93+
test('should log a warning when the configuration changes from the starting value', async () => {
94+
const { errors$ } = setupScenario(100);
95+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
96+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
97+
expect(logger.warn).toHaveBeenCalledWith(
98+
'Max workers configuration is temporarily reduced after Elasticsearch returned 1 "too many request" error(s).'
99+
);
100+
});
101+
102+
test('should increase configuration back to normal incrementally after an error is emitted', async () => {
103+
const { subscription, errors$ } = setupScenario(100);
104+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
105+
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
106+
expect(subscription).toHaveBeenNthCalledWith(2, 80);
107+
expect(subscription).toHaveBeenNthCalledWith(3, 84);
108+
// 88.2- > 89 from Math.ceil
109+
expect(subscription).toHaveBeenNthCalledWith(4, 89);
110+
expect(subscription).toHaveBeenNthCalledWith(5, 94);
111+
expect(subscription).toHaveBeenNthCalledWith(6, 99);
112+
// 103.95 -> 100 from Math.min with starting value
113+
expect(subscription).toHaveBeenNthCalledWith(7, 100);
114+
// No new calls due to value not changing and usage of distinctUntilChanged()
115+
expect(subscription).toHaveBeenCalledTimes(7);
116+
});
117+
118+
test('should keep reducing configuration when errors keep emitting', async () => {
119+
const { subscription, errors$ } = setupScenario(100);
120+
for (let i = 0; i < 20; i++) {
121+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
122+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
123+
}
124+
expect(subscription).toHaveBeenNthCalledWith(2, 80);
125+
expect(subscription).toHaveBeenNthCalledWith(3, 64);
126+
// 51.2 -> 51 from Math.floor
127+
expect(subscription).toHaveBeenNthCalledWith(4, 51);
128+
expect(subscription).toHaveBeenNthCalledWith(5, 40);
129+
expect(subscription).toHaveBeenNthCalledWith(6, 32);
130+
expect(subscription).toHaveBeenNthCalledWith(7, 25);
131+
expect(subscription).toHaveBeenNthCalledWith(8, 20);
132+
expect(subscription).toHaveBeenNthCalledWith(9, 16);
133+
expect(subscription).toHaveBeenNthCalledWith(10, 12);
134+
expect(subscription).toHaveBeenNthCalledWith(11, 9);
135+
expect(subscription).toHaveBeenNthCalledWith(12, 7);
136+
expect(subscription).toHaveBeenNthCalledWith(13, 5);
137+
expect(subscription).toHaveBeenNthCalledWith(14, 4);
138+
expect(subscription).toHaveBeenNthCalledWith(15, 3);
139+
expect(subscription).toHaveBeenNthCalledWith(16, 2);
140+
expect(subscription).toHaveBeenNthCalledWith(17, 1);
141+
// No new calls due to value not changing and usage of distinctUntilChanged()
142+
expect(subscription).toHaveBeenCalledTimes(17);
143+
});
144+
});
145+
146+
describe('pollInterval configuration', () => {
147+
function setupScenario(startingPollInterval: number) {
148+
const errors$ = new Subject<Error>();
149+
const subscription = jest.fn();
150+
const { pollIntervalConfiguration$ } = createManagedConfiguration({
151+
logger,
152+
errors$,
153+
startingPollInterval,
154+
startingMaxWorkers: 1,
155+
});
156+
pollIntervalConfiguration$.subscribe(subscription);
157+
return { subscription, errors$ };
158+
}
159+
160+
beforeEach(() => {
161+
jest.resetAllMocks();
162+
clock = sinon.useFakeTimers();
163+
});
164+
165+
afterEach(() => clock.restore());
166+
167+
test('should increase configuration at the next interval when an error is emitted', async () => {
168+
const { subscription, errors$ } = setupScenario(100);
169+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
170+
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
171+
expect(subscription).toHaveBeenCalledTimes(1);
172+
clock.tick(1);
173+
expect(subscription).toHaveBeenCalledTimes(2);
174+
expect(subscription).toHaveBeenNthCalledWith(2, 120);
175+
});
176+
177+
test('should log a warning when the configuration changes from the starting value', async () => {
178+
const { errors$ } = setupScenario(100);
179+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
180+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
181+
expect(logger.warn).toHaveBeenCalledWith(
182+
'Poll interval configuration is temporarily increased after Elasticsearch returned 1 "too many request" error(s).'
183+
);
184+
});
185+
186+
test('should decrease configuration back to normal incrementally after an error is emitted', async () => {
187+
const { subscription, errors$ } = setupScenario(100);
188+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
189+
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
190+
expect(subscription).toHaveBeenNthCalledWith(2, 120);
191+
expect(subscription).toHaveBeenNthCalledWith(3, 114);
192+
// 108.3 -> 108 from Math.floor
193+
expect(subscription).toHaveBeenNthCalledWith(4, 108);
194+
expect(subscription).toHaveBeenNthCalledWith(5, 102);
195+
// 96.9 -> 100 from Math.max with the starting value
196+
expect(subscription).toHaveBeenNthCalledWith(6, 100);
197+
// No new calls due to value not changing and usage of distinctUntilChanged()
198+
expect(subscription).toHaveBeenCalledTimes(6);
199+
});
200+
201+
test('should increase configuration when errors keep emitting', async () => {
202+
const { subscription, errors$ } = setupScenario(100);
203+
for (let i = 0; i < 3; i++) {
204+
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
205+
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
206+
}
207+
expect(subscription).toHaveBeenNthCalledWith(2, 120);
208+
expect(subscription).toHaveBeenNthCalledWith(3, 144);
209+
// 172.8 -> 173 from Math.ceil
210+
expect(subscription).toHaveBeenNthCalledWith(4, 173);
211+
});
212+
});
213+
});

0 commit comments

Comments
 (0)