diff --git a/src/server/task_manager/README.md b/src/server/task_manager/README.md index 11d0417573156..139501731a69a 100644 --- a/src/server/task_manager/README.md +++ b/src/server/task_manager/README.md @@ -201,7 +201,7 @@ const manager = server.taskManager; // Schedules a task. All properties are as documented in the previous // storage section, except that here, params is an object, not a JSON // string. -const task = manager.schedule({ +const task = await manager.schedule({ taskType, runAt, interval, diff --git a/src/server/task_manager/client_wrapper.ts b/src/server/task_manager/client_wrapper.ts index 8c8d94d5d342c..e37922d11e1b6 100644 --- a/src/server/task_manager/client_wrapper.ts +++ b/src/server/task_manager/client_wrapper.ts @@ -19,6 +19,7 @@ import { TaskInstance } from './task'; import { TaskManager } from './task_manager'; +import { FetchOpts } from './task_store'; export class TaskManagerClientWrapper { private client: TaskManager | null; @@ -27,14 +28,19 @@ export class TaskManagerClientWrapper { this.client = null; } - public setClient(client: TaskManager) { + public async setClient(client: TaskManager) { this.client = client; } public schedule(task: TaskInstance) { - if (this.client == null) { - throw new Error('Task Manager Client has not been set properly!'); - } - this.client.schedule(task); + return this.client ? this.client.schedule(task) : null; + } + + public remove(id: string) { + return this.client ? this.client.remove(id) : null; + } + + public fetch(opts: FetchOpts = {}) { + return this.client ? this.client.fetch(opts) : null; } } diff --git a/src/server/task_manager/default_client.ts b/src/server/task_manager/default_client.ts new file mode 100644 index 0000000000000..d192ee40ae8c2 --- /dev/null +++ b/src/server/task_manager/default_client.ts @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { fillPool } from './fill_pool'; +import { TaskManagerLogger } from './logger'; +import { ConcreteTaskInstance, SanitizedTaskDefinition, TaskDictionary } from './task'; +import { TaskManager } from './task_manager'; +import { TaskPoller } from './task_poller'; +import { TaskPool } from './task_pool'; +import { TaskManagerRunner } from './task_runner'; +import { TaskStore } from './task_store'; + +export async function getDefaultClient( + kbnServer: any, + server: any, + config: any, + logger: TaskManagerLogger, + maxWorkers: number, + definitions: TaskDictionary +): Promise { + const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; + const store = new TaskStore({ + index: config.get('taskManager.index'), + callCluster, + maxAttempts: config.get('taskManager.max_attempts'), + supportedTypes: Object.keys(definitions), + }); + + logger.debug('Initializing the task manager index'); + await store.init(); + + const pool = new TaskPool({ + logger, + maxWorkers, + }); + + const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ + callCluster, + kbnServer, + taskInstance, + }); + + const poller = new TaskPoller({ + logger, + pollInterval: config.get('taskManager.poll_interval'), + work: () => + fillPool( + pool.run, + store.fetchAvailableTasks, + (instance: ConcreteTaskInstance) => + new TaskManagerRunner({ + logger, + definition: definitions[instance.taskType], + instance, + store, + contextProvider, + }) + ), + }); + + poller.start(); + + return new TaskManager({ store, poller }); +} diff --git a/src/server/task_manager/task.ts b/src/server/task_manager/task.ts index d69d3145eb4d0..df94f204ea403 100644 --- a/src/server/task_manager/task.ts +++ b/src/server/task_manager/task.ts @@ -84,10 +84,13 @@ export const validateRunResult = Joi.object({ state: Joi.object().optional(), }).optional(); -/** - * The type signature of the function that performs a task. - */ -export type RunFunction = (context: RunContext) => PromiseLike; +export type RunFunction = () => PromiseLike; + +export type CancelFunction = () => PromiseLike; + +export type TaskRunCreatorFunction = ( + context: RunContext +) => { run: RunFunction; cancel?: CancelFunction }; /** * Defines a task which can be scheduled and run by the Kibana @@ -123,14 +126,7 @@ export interface TaskDefinition { */ numWorkers?: number; - /** - * A function which, does the work this task is built to do. Note, - * this is a *function* and is not guaranteed to be called with - * the *this* context of the task. - * - * @memberof TaskDefinition - */ - run: RunFunction; + createTaskRunner: TaskRunCreatorFunction; } /** @@ -146,7 +142,7 @@ export const validateTaskDefinition = Joi.object({ description: Joi.string().optional(), timeOut: Joi.string().default('5m'), numWorkers: Joi.number().default(1), - run: Joi.func().required(), + createTaskRunner: Joi.func().required(), }).default(); /** diff --git a/src/server/task_manager/task_manager.ts b/src/server/task_manager/task_manager.ts index 693d6e48f3c5c..13fa47211310e 100644 --- a/src/server/task_manager/task_manager.ts +++ b/src/server/task_manager/task_manager.ts @@ -19,7 +19,7 @@ import { TaskInstance } from './task'; import { TaskPoller } from './task_poller'; -import { FetchOpts, FetchResult, TaskStore } from './task_store'; +import { FetchOpts, FetchResult, RawTaskDoc, TaskStore } from './task_store'; interface Opts { poller: TaskPoller; @@ -35,16 +35,17 @@ export class TaskManager { this.store = opts.store; } - public async schedule(task: TaskInstance) { - await this.store.schedule(task); + public async schedule(task: TaskInstance): Promise { + const result = await this.store.schedule(task); this.poller.attemptWork(); + return result; } public fetch(opts: FetchOpts = {}): Promise { return this.store.fetch(opts); } - public remove(id: string): Promise { + public remove(id: string): Promise { return this.store.remove(id); } } diff --git a/src/server/task_manager/task_manager_mixin.ts b/src/server/task_manager/task_manager_mixin.ts index d28d139d01194..04349dbfc3837 100644 --- a/src/server/task_manager/task_manager_mixin.ts +++ b/src/server/task_manager/task_manager_mixin.ts @@ -19,72 +19,31 @@ import Joi from 'joi'; import { TaskManagerClientWrapper } from './client_wrapper'; -import { fillPool } from './fill_pool'; +import { getDefaultClient } from './default_client'; import { TaskManagerLogger } from './logger'; import { - ConcreteTaskInstance, SanitizedTaskDefinition, TaskDefinition, TaskDictionary, validateTaskDefinition, } from './task'; -import { TaskManager } from './task_manager'; -import { TaskPoller } from './task_poller'; -import { TaskPool } from './task_pool'; -import { TaskManagerRunner } from './task_runner'; -import { TaskStore } from './task_store'; export async function taskManagerMixin(kbnServer: any, server: any, config: any) { - const logger = new TaskManagerLogger((...args) => server.log(...args)); + const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); const maxWorkers = config.get('taskManager.max_workers'); const definitions = extractTaskDefinitions(maxWorkers, kbnServer.uiExports.taskDefinitions); server.decorate('server', 'taskManager', new TaskManagerClientWrapper()); kbnServer.afterPluginsInit(async () => { - const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser; - const store = new TaskStore({ - index: config.get('taskManager.index'), - callCluster, - maxAttempts: config.get('taskManager.max_attempts'), - supportedTypes: Object.keys(definitions), - }); - - logger.debug('Initializing the task manager index'); - await store.init(); - - const pool = new TaskPool({ - logger, - maxWorkers, - }); - - const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({ - callCluster, + const client = await getDefaultClient( kbnServer, - taskInstance, - }); - - const poller = new TaskPoller({ + server, + config, logger, - pollInterval: config.get('taskManager.poll_interval'), - work: () => - fillPool( - pool.run, - store.fetchAvailableTasks, - (instance: ConcreteTaskInstance) => - new TaskManagerRunner({ - logger, - instance, - store, - contextProvider, - definition: definitions[instance.taskType], - }) - ), - }); - - poller.start(); - - const client = new TaskManager({ store, poller }); + maxWorkers, + definitions + ); server.taskManager.setClient(client); }); } diff --git a/src/server/task_manager/task_runner.ts b/src/server/task_manager/task_runner.ts index b1bc0ff92ffc9..f43ff05072041 100644 --- a/src/server/task_manager/task_runner.ts +++ b/src/server/task_manager/task_runner.ts @@ -146,7 +146,8 @@ export class TaskManagerRunner implements TaskRunner { try { this.logger.debug(`Running task ${this}`); const context = await this.contextProvider(this.instance); - this.promise = this.definition.run(context); + const taskRunner = this.definition.createTaskRunner(context); + this.promise = taskRunner.run(); return this.processResult(this.validateResult(await this.promise)); } catch (error) { this.logger.warning(`Task ${this} failed ${error.stack}`); @@ -190,7 +191,7 @@ export class TaskManagerRunner implements TaskRunner { * @memberof TaskManagerRunner */ public async cancel() { - const promise: any = this.promise; + const promise: any = this.promise; // needs to be the stored taskrunner from `const taskRunner = this.definition.createTaskRunner(context)` if (promise && promise.cancel) { this.promise = undefined; diff --git a/src/server/task_manager/task_store.ts b/src/server/task_manager/task_store.ts index 16d9e922ed2f7..83408428dd874 100644 --- a/src/server/task_manager/task_store.ts +++ b/src/server/task_manager/task_store.ts @@ -40,7 +40,7 @@ export interface FetchResult { } // Internal, the raw document, as stored in the Kibana index. -interface RawTaskDoc { +export interface RawTaskDoc { _id: string; _index: string; _type: string; @@ -142,7 +142,7 @@ export class TaskStore { * * @param task - The task being scheduled. */ - public schedule(task: TaskInstance) { + public schedule(task: TaskInstance): Promise { return this.callCluster('index', { index: this.index, type: DOC_TYPE, @@ -230,7 +230,7 @@ export class TaskStore { * @param {string} id * @returns {Promise} */ - public async remove(id: string): Promise { + public async remove(id: string): Promise { return this.callCluster('delete', { id, index: this.index,