Skip to content

Commit ecf3313

Browse files
authored
Make task manager maxWorkers and pollInterval observables (#75293)
* WIP step 1 * WIP step 2 * Cleanup * Make maxWorkers an observable for the task pool * Cleanup * Fix test failures * Use BehaviorSubject * Add some tests
1 parent 7376e4c commit ecf3313

File tree

5 files changed

+105
-33
lines changed

5 files changed

+105
-33
lines changed

x-pack/plugins/task_manager/server/polling/task_poller.test.ts

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
*/
66

77
import _ from 'lodash';
8-
import { Subject } from 'rxjs';
8+
import { Subject, of, BehaviorSubject } from 'rxjs';
99
import { Option, none, some } from 'fp-ts/lib/Option';
1010
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
1111
import { fakeSchedulers } from 'rxjs-marbles/jest';
@@ -24,10 +24,11 @@ describe('TaskPoller', () => {
2424

2525
const work = jest.fn(async () => true);
2626
createTaskPoller<void, boolean>({
27-
pollInterval,
27+
pollInterval$: of(pollInterval),
2828
bufferCapacity,
2929
getCapacity: () => 1,
3030
work,
31+
workTimeout: pollInterval * 5,
3132
pollRequests$: new Subject<Option<void>>(),
3233
}).subscribe(() => {});
3334

@@ -40,9 +41,51 @@ describe('TaskPoller', () => {
4041
await sleep(0);
4142
expect(work).toHaveBeenCalledTimes(1);
4243

44+
await sleep(0);
45+
await sleep(0);
46+
advance(pollInterval + 10);
47+
await sleep(0);
48+
expect(work).toHaveBeenCalledTimes(2);
49+
})
50+
);
51+
52+
test(
53+
'poller adapts to pollInterval changes',
54+
fakeSchedulers(async (advance) => {
55+
const pollInterval = 100;
56+
const pollInterval$ = new BehaviorSubject(pollInterval);
57+
const bufferCapacity = 5;
58+
59+
const work = jest.fn(async () => true);
60+
createTaskPoller<void, boolean>({
61+
pollInterval$,
62+
bufferCapacity,
63+
getCapacity: () => 1,
64+
work,
65+
workTimeout: pollInterval * 5,
66+
pollRequests$: new Subject<Option<void>>(),
67+
}).subscribe(() => {});
68+
69+
// `work` is async, we have to force a node `tick`
4370
await sleep(0);
4471
advance(pollInterval);
72+
expect(work).toHaveBeenCalledTimes(1);
73+
74+
pollInterval$.next(pollInterval * 2);
75+
76+
// `work` is async, we have to force a node `tick`
77+
await sleep(0);
78+
advance(pollInterval);
79+
expect(work).toHaveBeenCalledTimes(1);
80+
advance(pollInterval);
4581
expect(work).toHaveBeenCalledTimes(2);
82+
83+
pollInterval$.next(pollInterval / 2);
84+
85+
// `work` is async, we have to force a node `tick`
86+
await sleep(0);
87+
advance(pollInterval / 2);
88+
expect(work).toHaveBeenCalledTimes(3);
4689
})
4790
);
4891

@@ -56,9 +99,10 @@ describe('TaskPoller', () => {
5699

57100
let hasCapacity = true;
58101
createTaskPoller<void, boolean>({
59-
pollInterval,
102+
pollInterval$: of(pollInterval),
60103
bufferCapacity,
61104
work,
105+
workTimeout: pollInterval * 5,
62106
getCapacity: () => (hasCapacity ? 1 : 0),
63107
pollRequests$: new Subject<Option<void>>(),
64108
}).subscribe(() => {});
@@ -113,9 +157,10 @@ describe('TaskPoller', () => {
113157
const work = jest.fn(async () => true);
114158
const pollRequests$ = new Subject<Option<void>>();
115159
createTaskPoller<void, boolean>({
116-
pollInterval,
160+
pollInterval$: of(pollInterval),
117161
bufferCapacity,
118162
work,
163+
workTimeout: pollInterval * 5,
119164
getCapacity: () => 1,
120165
pollRequests$,
121166
}).subscribe(jest.fn());
@@ -157,9 +202,10 @@ describe('TaskPoller', () => {
157202
const work = jest.fn(async () => true);
158203
const pollRequests$ = new Subject<Option<void>>();
159204
createTaskPoller<void, boolean>({
160-
pollInterval,
205+
pollInterval$: of(pollInterval),
161206
bufferCapacity,
162207
work,
208+
workTimeout: pollInterval * 5,
163209
getCapacity: () => (hasCapacity ? 1 : 0),
164210
pollRequests$,
165211
}).subscribe(() => {});
@@ -200,9 +246,10 @@ describe('TaskPoller', () => {
200246
const work = jest.fn(async () => true);
201247
const pollRequests$ = new Subject<Option<string>>();
202248
createTaskPoller<string, boolean>({
203-
pollInterval,
249+
pollInterval$: of(pollInterval),
204250
bufferCapacity,
205251
work,
252+
workTimeout: pollInterval * 5,
206253
getCapacity: () => 1,
207254
pollRequests$,
208255
}).subscribe(() => {});
@@ -235,7 +282,7 @@ describe('TaskPoller', () => {
235282
const handler = jest.fn();
236283
const pollRequests$ = new Subject<Option<string>>();
237284
createTaskPoller<string, string[]>({
238-
pollInterval,
285+
pollInterval$: of(pollInterval),
239286
bufferCapacity,
240287
work: async (...args) => {
241288
await worker;
@@ -285,7 +332,7 @@ describe('TaskPoller', () => {
285332
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
286333
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
287334
createTaskPoller<[string, Resolvable], string[]>({
288-
pollInterval,
335+
pollInterval$: of(pollInterval),
289336
bufferCapacity,
290337
work: async (...resolvables) => {
291338
await Promise.all(resolvables.map(([, future]) => future));
@@ -344,11 +391,12 @@ describe('TaskPoller', () => {
344391
const handler = jest.fn();
345392
const pollRequests$ = new Subject<Option<string>>();
346393
createTaskPoller<string, string[]>({
347-
pollInterval,
394+
pollInterval$: of(pollInterval),
348395
bufferCapacity,
349396
work: async (...args) => {
350397
throw new Error('failed to work');
351398
},
399+
workTimeout: pollInterval * 5,
352400
getCapacity: () => 5,
353401
pollRequests$,
354402
}).subscribe(handler);
@@ -383,9 +431,10 @@ describe('TaskPoller', () => {
383431
return callCount;
384432
});
385433
createTaskPoller<string, number>({
386-
pollInterval,
434+
pollInterval$: of(pollInterval),
387435
bufferCapacity,
388436
work,
437+
workTimeout: pollInterval * 5,
389438
getCapacity: () => 5,
390439
pollRequests$,
391440
}).subscribe(handler);
@@ -424,9 +473,10 @@ describe('TaskPoller', () => {
424473
const work = jest.fn(async () => {});
425474
const pollRequests$ = new Subject<Option<string>>();
426475
createTaskPoller<string, void>({
427-
pollInterval,
476+
pollInterval$: of(pollInterval),
428477
bufferCapacity,
429478
work,
479+
workTimeout: pollInterval * 5,
430480
getCapacity: () => 5,
431481
pollRequests$,
432482
}).subscribe(handler);

x-pack/plugins/task_manager/server/polling/task_poller.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import { performance } from 'perf_hooks';
1212
import { after } from 'lodash';
1313
import { Subject, merge, interval, of, Observable } from 'rxjs';
14-
import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators';
14+
import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators';
1515

1616
import { pipe } from 'fp-ts/lib/pipeable';
1717
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
@@ -30,12 +30,12 @@ import { timeoutPromiseAfter } from './timeout_promise_after';
3030
type WorkFn<T, H> = (...params: T[]) => Promise<H>;
3131

3232
interface Opts<T, H> {
33-
pollInterval: number;
33+
pollInterval$: Observable<number>;
3434
bufferCapacity: number;
3535
getCapacity: () => number;
3636
pollRequests$: Observable<Option<T>>;
3737
work: WorkFn<T, H>;
38-
workTimeout?: number;
38+
workTimeout: number;
3939
}
4040

4141
/**
@@ -52,7 +52,7 @@ interface Opts<T, H> {
5252
* of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$
5353
*/
5454
export function createTaskPoller<T, H>({
55-
pollInterval,
55+
pollInterval$,
5656
getCapacity,
5757
pollRequests$,
5858
bufferCapacity,
@@ -67,7 +67,10 @@ export function createTaskPoller<T, H>({
6767
// emit a polling event on demand
6868
pollRequests$,
6969
// emit a polling event on a fixed interval
70-
interval(pollInterval).pipe(mapTo(none))
70+
pollInterval$.pipe(
71+
switchMap((period) => interval(period)),
72+
mapTo(none)
73+
)
7174
).pipe(
7275
// buffer all requests in a single set (to remove duplicates) as we don't want
7376
// work to take place in parallel (it could cause Task Manager to pull in the same
@@ -95,7 +98,7 @@ export function createTaskPoller<T, H>({
9598
await promiseResult<H, Error>(
9699
timeoutPromiseAfter<H, Error>(
97100
work(...pullFromSet(set, getCapacity())),
98-
workTimeout ?? pollInterval,
101+
workTimeout,
99102
() => new Error(`work has timed out`)
100103
)
101104
),

x-pack/plugins/task_manager/server/task_manager.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* or more contributor license agreements. Licensed under the Elastic License;
44
* you may not use this file except in compliance with the Elastic License.
55
*/
6-
import { Subject, Observable, Subscription } from 'rxjs';
6+
import { BehaviorSubject, Subject, Observable, Subscription } from 'rxjs';
77
import { filter } from 'rxjs/operators';
88

99
import { performance } from 'perf_hooks';
@@ -149,14 +149,17 @@ export class TaskManager {
149149
// pipe store events into the TaskManager's event stream
150150
this.store.events.subscribe((event) => this.events$.next(event));
151151

152+
const maxWorkers$ = new BehaviorSubject(opts.config.max_workers);
153+
const pollInterval$ = new BehaviorSubject(opts.config.poll_interval);
154+
152155
this.bufferedStore = new BufferedTaskStore(this.store, {
153156
bufferMaxOperations: opts.config.max_workers,
154157
logger: this.logger,
155158
});
156159

157160
this.pool = new TaskPool({
158161
logger: this.logger,
159-
maxWorkers: opts.config.max_workers,
162+
maxWorkers$,
160163
});
161164

162165
const {
@@ -166,7 +169,7 @@ export class TaskManager {
166169
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
167170
() =>
168171
createTaskPoller<string, FillPoolResult>({
169-
pollInterval,
172+
pollInterval$,
170173
bufferCapacity: opts.config.request_capacity,
171174
getCapacity: () => this.pool.availableWorkers,
172175
pollRequests$: this.claimRequests$,

x-pack/plugins/task_manager/server/task_pool.test.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
import sinon from 'sinon';
8+
import { of, Subject } from 'rxjs';
89
import { TaskPool, TaskPoolRunResult } from './task_pool';
910
import { mockLogger, resolvable, sleep } from './test_utils';
1011
import { asOk } from './lib/result_type';
@@ -14,7 +15,7 @@ import moment from 'moment';
1415
describe('TaskPool', () => {
1516
test('occupiedWorkers are a sum of running tasks', async () => {
1617
const pool = new TaskPool({
17-
maxWorkers: 200,
18+
maxWorkers$: of(200),
1819
logger: mockLogger(),
1920
});
2021

@@ -26,7 +27,7 @@ describe('TaskPool', () => {
2627

2728
test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => {
2829
const pool = new TaskPool({
29-
maxWorkers: 10,
30+
maxWorkers$: of(10),
3031
logger: mockLogger(),
3132
});
3233

@@ -36,9 +37,21 @@ describe('TaskPool', () => {
3637
expect(pool.availableWorkers).toEqual(7);
3738
});
3839

40+
test('availableWorkers is 0 until maxWorkers$ pushes a value', async () => {
41+
const maxWorkers$ = new Subject<number>();
42+
const pool = new TaskPool({
43+
maxWorkers$,
44+
logger: mockLogger(),
45+
});
46+
47+
expect(pool.availableWorkers).toEqual(0);
48+
maxWorkers$.next(10);
49+
expect(pool.availableWorkers).toEqual(10);
50+
});
51+
3952
test('does not run tasks that are beyond its available capacity', async () => {
4053
const pool = new TaskPool({
41-
maxWorkers: 2,
54+
maxWorkers$: of(2),
4255
logger: mockLogger(),
4356
});
4457

@@ -60,7 +73,7 @@ describe('TaskPool', () => {
6073
test('should log when marking a Task as running fails', async () => {
6174
const logger = mockLogger();
6275
const pool = new TaskPool({
63-
maxWorkers: 2,
76+
maxWorkers$: of(2),
6477
logger,
6578
});
6679

@@ -83,7 +96,7 @@ describe('TaskPool', () => {
8396
test('should log when running a Task fails', async () => {
8497
const logger = mockLogger();
8598
const pool = new TaskPool({
86-
maxWorkers: 3,
99+
maxWorkers$: of(3),
87100
logger,
88101
});
89102

@@ -106,7 +119,7 @@ describe('TaskPool', () => {
106119
test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => {
107120
const logger = mockLogger();
108121
const pool = new TaskPool({
109-
maxWorkers: 3,
122+
maxWorkers$: of(3),
110123
logger,
111124
});
112125

@@ -130,7 +143,7 @@ describe('TaskPool', () => {
130143
test('Running a task which fails still takes up capacity', async () => {
131144
const logger = mockLogger();
132145
const pool = new TaskPool({
133-
maxWorkers: 1,
146+
maxWorkers$: of(1),
134147
logger,
135148
});
136149

@@ -147,7 +160,7 @@ describe('TaskPool', () => {
147160

148161
test('clears up capacity when a task completes', async () => {
149162
const pool = new TaskPool({
150-
maxWorkers: 1,
163+
maxWorkers$: of(1),
151164
logger: mockLogger(),
152165
});
153166

@@ -193,7 +206,7 @@ describe('TaskPool', () => {
193206
test('run cancels expired tasks prior to running new tasks', async () => {
194207
const logger = mockLogger();
195208
const pool = new TaskPool({
196-
maxWorkers: 2,
209+
maxWorkers$: of(2),
197210
logger,
198211
});
199212

@@ -251,7 +264,7 @@ describe('TaskPool', () => {
251264
const logger = mockLogger();
252265
const pool = new TaskPool({
253266
logger,
254-
maxWorkers: 20,
267+
maxWorkers$: of(20),
255268
});
256269

257270
const cancelled = resolvable();

0 commit comments

Comments
 (0)