Skip to content
9 changes: 6 additions & 3 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { Subject, Observable, Subscription } from 'rxjs';
import { BehaviorSubject, Subject, Observable, Subscription } from 'rxjs';
import { filter } from 'rxjs/operators';

import { performance } from 'perf_hooks';
Expand Down Expand Up @@ -144,18 +144,21 @@ export class TaskManager {
// pipe store events into the TaskManager's event stream
this.store.events.subscribe((event) => this.events$.next(event));

const maxWorkers$ = new BehaviorSubject(opts.config.max_workers);
const pollInterval$ = new BehaviorSubject(opts.config.poll_interval);

this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
});

this.pool = new TaskPool({
logger: this.logger,
maxWorkers: opts.config.max_workers,
maxWorkers$,
});

this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.poll_interval,
pollInterval$,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
Expand Down
72 changes: 61 additions & 11 deletions x-pack/plugins/task_manager/server/task_poller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import _ from 'lodash';
import { Subject } from 'rxjs';
import { Subject, of, BehaviorSubject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
Expand All @@ -24,10 +24,11 @@ describe('TaskPoller', () => {

const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
getCapacity: () => 1,
work,
workTimeout: pollInterval * 5,
pollRequests$: new Subject<Option<void>>(),
}).subscribe(() => {});

Expand All @@ -40,9 +41,51 @@ describe('TaskPoller', () => {
await sleep(0);
expect(work).toHaveBeenCalledTimes(1);

await sleep(0);
await sleep(0);
advance(pollInterval + 10);
await sleep(0);
expect(work).toHaveBeenCalledTimes(2);
})
);

test(
'poller adapts to pollInterval changes',
fakeSchedulers(async (advance) => {
const pollInterval = 100;
const pollInterval$ = new BehaviorSubject(pollInterval);
const bufferCapacity = 5;

const work = jest.fn(async () => true);
createTaskPoller<void, boolean>({
pollInterval$,
bufferCapacity,
getCapacity: () => 1,
work,
workTimeout: pollInterval * 5,
pollRequests$: new Subject<Option<void>>(),
}).subscribe(() => {});

// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);

pollInterval$.next(pollInterval * 2);

// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(1);
advance(pollInterval);
expect(work).toHaveBeenCalledTimes(2);

pollInterval$.next(pollInterval / 2);

// `work` is async, we have to force a node `tick`
await sleep(0);
advance(pollInterval / 2);
expect(work).toHaveBeenCalledTimes(3);
})
);

Expand All @@ -56,9 +99,10 @@ describe('TaskPoller', () => {

let hasCapacity = true;
createTaskPoller<void, boolean>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => (hasCapacity ? 1 : 0),
pollRequests$: new Subject<Option<void>>(),
}).subscribe(() => {});
Expand Down Expand Up @@ -113,9 +157,10 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => 1,
pollRequests$,
}).subscribe(jest.fn());
Expand Down Expand Up @@ -157,9 +202,10 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<void>>();
createTaskPoller<void, boolean>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => (hasCapacity ? 1 : 0),
pollRequests$,
}).subscribe(() => {});
Expand Down Expand Up @@ -200,9 +246,10 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => true);
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, boolean>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => 1,
pollRequests$,
}).subscribe(() => {});
Expand Down Expand Up @@ -235,7 +282,7 @@ describe('TaskPoller', () => {
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...args) => {
await worker;
Expand Down Expand Up @@ -285,7 +332,7 @@ describe('TaskPoller', () => {
type ResolvableTupple = [string, PromiseLike<void> & Resolvable];
const pollRequests$ = new Subject<Option<ResolvableTupple>>();
createTaskPoller<[string, Resolvable], string[]>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...resolvables) => {
await Promise.all(resolvables.map(([, future]) => future));
Expand Down Expand Up @@ -344,11 +391,12 @@ describe('TaskPoller', () => {
const handler = jest.fn();
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, string[]>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work: async (...args) => {
throw new Error('failed to work');
},
workTimeout: pollInterval * 5,
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
Expand Down Expand Up @@ -383,9 +431,10 @@ describe('TaskPoller', () => {
return callCount;
});
createTaskPoller<string, number>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
Expand Down Expand Up @@ -424,9 +473,10 @@ describe('TaskPoller', () => {
const work = jest.fn(async () => {});
const pollRequests$ = new Subject<Option<string>>();
createTaskPoller<string, void>({
pollInterval,
pollInterval$: of(pollInterval),
bufferCapacity,
work,
workTimeout: pollInterval * 5,
getCapacity: () => 5,
pollRequests$,
}).subscribe(handler);
Expand Down
15 changes: 9 additions & 6 deletions x-pack/plugins/task_manager/server/task_poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import { performance } from 'perf_hooks';
import { after } from 'lodash';
import { Subject, merge, interval, of, Observable } from 'rxjs';
import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators';
import { mapTo, filter, scan, concatMap, tap, catchError, switchMap } from 'rxjs/operators';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
Expand All @@ -30,12 +30,12 @@ import { timeoutPromiseAfter } from './lib/timeout_promise_after';
type WorkFn<T, H> = (...params: T[]) => Promise<H>;

interface Opts<T, H> {
pollInterval: number;
pollInterval$: Observable<number>;
bufferCapacity: number;
getCapacity: () => number;
pollRequests$: Observable<Option<T>>;
work: WorkFn<T, H>;
workTimeout?: number;
workTimeout: number;
}

/**
Expand All @@ -52,7 +52,7 @@ interface Opts<T, H> {
* of unique request argumets of type T. The queue holds all the buffered request arguments streamed in via pollRequests$
*/
export function createTaskPoller<T, H>({
pollInterval,
pollInterval$,
getCapacity,
pollRequests$,
bufferCapacity,
Expand All @@ -67,7 +67,10 @@ export function createTaskPoller<T, H>({
// emit a polling event on demand
pollRequests$,
// emit a polling event on a fixed interval
interval(pollInterval).pipe(mapTo(none))
pollInterval$.pipe(
switchMap((period) => interval(period)),
mapTo(none)
)
).pipe(
// buffer all requests in a single set (to remove duplicates) as we don't want
// work to take place in parallel (it could cause Task Manager to pull in the same
Expand Down Expand Up @@ -95,7 +98,7 @@ export function createTaskPoller<T, H>({
await promiseResult<H, Error>(
timeoutPromiseAfter<H, Error>(
work(...pullFromSet(set, getCapacity())),
workTimeout ?? pollInterval,
workTimeout,
() => new Error(`work has timed out`)
)
),
Expand Down
33 changes: 23 additions & 10 deletions x-pack/plugins/task_manager/server/task_pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

import sinon from 'sinon';
import { of, Subject } from 'rxjs';
import { TaskPool, TaskPoolRunResult } from './task_pool';
import { mockLogger, resolvable, sleep } from './test_utils';
import { asOk } from './lib/result_type';
Expand All @@ -14,7 +15,7 @@ import moment from 'moment';
describe('TaskPool', () => {
test('occupiedWorkers are a sum of running tasks', async () => {
const pool = new TaskPool({
maxWorkers: 200,
maxWorkers$: of(200),
logger: mockLogger(),
});

Expand All @@ -26,7 +27,7 @@ describe('TaskPool', () => {

test('availableWorkers are a function of total_capacity - occupiedWorkers', async () => {
const pool = new TaskPool({
maxWorkers: 10,
maxWorkers$: of(10),
logger: mockLogger(),
});

Expand All @@ -36,9 +37,21 @@ describe('TaskPool', () => {
expect(pool.availableWorkers).toEqual(7);
});

test('availableWorkers is 0 until maxWorkers$ pushes a value', async () => {
const maxWorkers$ = new Subject<number>();
const pool = new TaskPool({
maxWorkers$,
logger: mockLogger(),
});

expect(pool.availableWorkers).toEqual(0);
maxWorkers$.next(10);
expect(pool.availableWorkers).toEqual(10);
});

test('does not run tasks that are beyond its available capacity', async () => {
const pool = new TaskPool({
maxWorkers: 2,
maxWorkers$: of(2),
logger: mockLogger(),
});

Expand All @@ -60,7 +73,7 @@ describe('TaskPool', () => {
test('should log when marking a Task as running fails', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 2,
maxWorkers$: of(2),
logger,
});

Expand All @@ -83,7 +96,7 @@ describe('TaskPool', () => {
test('should log when running a Task fails', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 3,
maxWorkers$: of(3),
logger,
});

Expand All @@ -106,7 +119,7 @@ describe('TaskPool', () => {
test('should not log when running a Task fails due to the Task SO having been deleted while in flight', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 3,
maxWorkers$: of(3),
logger,
});

Expand All @@ -130,7 +143,7 @@ describe('TaskPool', () => {
test('Running a task which fails still takes up capacity', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 1,
maxWorkers$: of(1),
logger,
});

Expand All @@ -147,7 +160,7 @@ describe('TaskPool', () => {

test('clears up capacity when a task completes', async () => {
const pool = new TaskPool({
maxWorkers: 1,
maxWorkers$: of(1),
logger: mockLogger(),
});

Expand Down Expand Up @@ -193,7 +206,7 @@ describe('TaskPool', () => {
test('run cancels expired tasks prior to running new tasks', async () => {
const logger = mockLogger();
const pool = new TaskPool({
maxWorkers: 2,
maxWorkers$: of(2),
logger,
});

Expand Down Expand Up @@ -251,7 +264,7 @@ describe('TaskPool', () => {
const logger = mockLogger();
const pool = new TaskPool({
logger,
maxWorkers: 20,
maxWorkers$: of(20),
});

const cancelled = resolvable();
Expand Down
Loading