Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion x-pack/legacy/plugins/task_manager/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { TaskManager } from './task_manager';
import { TaskManager, claimAvailableTasks } from './task_manager';
import { SavedObjectsClientMock } from 'src/core/server/mocks';
import { SavedObjectsSerializer, SavedObjectsSchema } from 'src/core/server';
import { mockLogger } from './test_utils';
Expand Down Expand Up @@ -66,6 +66,7 @@ describe('TaskManager', () => {
const promise = client.schedule(task);
client.start();
await promise;

expect(savedObjectsClient.create).toHaveBeenCalled();
});

Expand Down Expand Up @@ -164,4 +165,28 @@ describe('TaskManager', () => {
/Cannot add middleware after the task manager is initialized/i
);
});

describe('claimAvailableTasks', () => {
test('should claim Available Tasks when there are available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));

const availableWorkers = 1;

claimAvailableTasks(claim, availableWorkers, logger);

expect(claim).toHaveBeenCalledTimes(1);
});

test('shouldnt claim Available Tasks when there are no available workers', () => {
const logger = mockLogger();
const claim = jest.fn(() => Promise.resolve({ docs: [], claimedTasks: 0 }));

const availableWorkers = 0;

claimAvailableTasks(claim, availableWorkers, logger);

expect(claim).not.toHaveBeenCalled();
});
});
});
58 changes: 42 additions & 16 deletions x-pack/legacy/plugins/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ import {
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { FetchOpts, FetchResult, TaskStore } from './task_store';
import {
FetchOpts,
FetchResult,
TaskStore,
OwnershipClaimingOpts,
ClaimOwnershipResult,
} from './task_store';

export interface TaskManagerOpts {
logger: Logger;
Expand Down Expand Up @@ -103,7 +109,17 @@ export class TaskManager {
const poller = new TaskPoller({
logger: this.logger,
pollInterval: opts.config.get('xpack.task_manager.poll_interval'),
work: (): Promise<void> => fillPool(pool.run, () => this.claimAvailableTasks(), createRunner),
work: (): Promise<void> =>
fillPool(
pool.run,
() =>
claimAvailableTasks(
this.store.claimAvailableTasks.bind(this.store),
this.pool.availableWorkers,
this.logger
),
createRunner
),
});

this.pool = pool;
Expand Down Expand Up @@ -135,20 +151,6 @@ export class TaskManager {
startPoller();
}

private async claimAvailableTasks() {
const { docs, claimedTasks } = await this.store.claimAvailableTasks({
size: this.pool.availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});

if (docs.length !== claimedTasks) {
this.logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
);
}
return docs;
}

private async waitUntilStarted() {
if (!this.isStarted) {
await new Promise(resolve => {
Expand Down Expand Up @@ -247,3 +249,27 @@ export class TaskManager {
}
}
}

export async function claimAvailableTasks(
claim: (opts: OwnershipClaimingOpts) => Promise<ClaimOwnershipResult>,
availableWorkers: number,
logger: Logger
) {
if (availableWorkers > 0) {
const { docs, claimedTasks } = await claim({
size: availableWorkers,
claimOwnershipUntil: intervalFromNow('30s')!,
});

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
);
}
return docs;
}
logger.info(
`[Task Ownership]: Task Manager has skipped Claiming Ownership of available tasks at it has ran out Available Workers. If this happens often, consider adjusting the "xpack.task_manager.max_workers" configuration.`
);
return [];
}