Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ const manager = server.taskManager;
// Schedules a task. All properties are as documented in the previous
// storage section, except that here, params is an object, not a JSON
// string.
const task = manager.schedule({
const task = await manager.schedule({
taskType,
runAt,
interval,
Expand Down
16 changes: 11 additions & 5 deletions src/server/task_manager/client_wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import { TaskInstance } from './task';
import { TaskManager } from './task_manager';
import { FetchOpts } from './task_store';

export class TaskManagerClientWrapper {
private client: TaskManager | null;
Expand All @@ -27,14 +28,19 @@ export class TaskManagerClientWrapper {
this.client = null;
}

public setClient(client: TaskManager) {
public async setClient(client: TaskManager) {
this.client = client;
}

public schedule(task: TaskInstance) {
if (this.client == null) {
throw new Error('Task Manager Client has not been set properly!');
}
this.client.schedule(task);
return this.client ? this.client.schedule(task) : null;
}

public remove(id: string) {
return this.client ? this.client.remove(id) : null;
}

public fetch(opts: FetchOpts = {}) {
return this.client ? this.client.fetch(opts) : null;
}
}
80 changes: 80 additions & 0 deletions src/server/task_manager/default_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { fillPool } from './fill_pool';
import { TaskManagerLogger } from './logger';
import { ConcreteTaskInstance, SanitizedTaskDefinition, TaskDictionary } from './task';
import { TaskManager } from './task_manager';
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { TaskStore } from './task_store';

export async function getDefaultClient(
kbnServer: any,
server: any,
config: any,
logger: TaskManagerLogger,
maxWorkers: number,
definitions: TaskDictionary<SanitizedTaskDefinition>
): Promise<TaskManager> {
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const store = new TaskStore({
index: config.get('taskManager.index'),
callCluster,
maxAttempts: config.get('taskManager.max_attempts'),
supportedTypes: Object.keys(definitions),
});

logger.debug('Initializing the task manager index');
await store.init();

const pool = new TaskPool({
logger,
maxWorkers,
});

const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({
callCluster,
kbnServer,
taskInstance,
});

const poller = new TaskPoller({
logger,
pollInterval: config.get('taskManager.poll_interval'),
work: () =>
fillPool(
pool.run,
store.fetchAvailableTasks,
(instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
logger,
definition: definitions[instance.taskType],
instance,
store,
contextProvider,
})
),
});

poller.start();

return new TaskManager({ store, poller });
}
22 changes: 9 additions & 13 deletions src/server/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ export const validateRunResult = Joi.object({
state: Joi.object().optional(),
}).optional();

/**
* The type signature of the function that performs a task.
*/
export type RunFunction = (context: RunContext) => PromiseLike<RunResult | undefined>;
export type RunFunction = () => PromiseLike<RunResult | undefined>;

export type CancelFunction = () => PromiseLike<RunResult | undefined>;

export type TaskRunCreatorFunction = (
context: RunContext
) => { run: RunFunction; cancel?: CancelFunction };

/**
* Defines a task which can be scheduled and run by the Kibana
Expand Down Expand Up @@ -123,14 +126,7 @@ export interface TaskDefinition {
*/
numWorkers?: number;

/**
* A function which, does the work this task is built to do. Note,
* this is a *function* and is not guaranteed to be called with
* the *this* context of the task.
*
* @memberof TaskDefinition
*/
run: RunFunction;
createTaskRunner: TaskRunCreatorFunction;
}

/**
Expand All @@ -146,7 +142,7 @@ export const validateTaskDefinition = Joi.object({
description: Joi.string().optional(),
timeOut: Joi.string().default('5m'),
numWorkers: Joi.number().default(1),
run: Joi.func().required(),
createTaskRunner: Joi.func().required(),
}).default();

/**
Expand Down
9 changes: 5 additions & 4 deletions src/server/task_manager/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { TaskInstance } from './task';
import { TaskPoller } from './task_poller';
import { FetchOpts, FetchResult, TaskStore } from './task_store';
import { FetchOpts, FetchResult, RawTaskDoc, TaskStore } from './task_store';

interface Opts {
poller: TaskPoller;
Expand All @@ -35,16 +35,17 @@ export class TaskManager {
this.store = opts.store;
}

public async schedule(task: TaskInstance) {
await this.store.schedule(task);
public async schedule(task: TaskInstance): Promise<RawTaskDoc> {
const result = await this.store.schedule(task);
this.poller.attemptWork();
return result;
}

public fetch(opts: FetchOpts = {}): Promise<FetchResult> {
return this.store.fetch(opts);
}

public remove(id: string): Promise<void> {
public remove(id: string): Promise<RawTaskDoc> {
return this.store.remove(id);
}
}
57 changes: 8 additions & 49 deletions src/server/task_manager/task_manager_mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,72 +19,31 @@

import Joi from 'joi';
import { TaskManagerClientWrapper } from './client_wrapper';
import { fillPool } from './fill_pool';
import { getDefaultClient } from './default_client';
import { TaskManagerLogger } from './logger';
import {
ConcreteTaskInstance,
SanitizedTaskDefinition,
TaskDefinition,
TaskDictionary,
validateTaskDefinition,
} from './task';
import { TaskManager } from './task_manager';
import { TaskPoller } from './task_poller';
import { TaskPool } from './task_pool';
import { TaskManagerRunner } from './task_runner';
import { TaskStore } from './task_store';

export async function taskManagerMixin(kbnServer: any, server: any, config: any) {
const logger = new TaskManagerLogger((...args) => server.log(...args));
const logger = new TaskManagerLogger((...args: any[]) => server.log(...args));
const maxWorkers = config.get('taskManager.max_workers');
const definitions = extractTaskDefinitions(maxWorkers, kbnServer.uiExports.taskDefinitions);

server.decorate('server', 'taskManager', new TaskManagerClientWrapper());

kbnServer.afterPluginsInit(async () => {
const callCluster = server.plugins.elasticsearch.getCluster('admin').callWithInternalUser;
const store = new TaskStore({
index: config.get('taskManager.index'),
callCluster,
maxAttempts: config.get('taskManager.max_attempts'),
supportedTypes: Object.keys(definitions),
});

logger.debug('Initializing the task manager index');
await store.init();

const pool = new TaskPool({
logger,
maxWorkers,
});

const contextProvider = async (taskInstance: ConcreteTaskInstance) => ({
callCluster,
const client = await getDefaultClient(
kbnServer,
taskInstance,
});

const poller = new TaskPoller({
server,
config,
logger,
pollInterval: config.get('taskManager.poll_interval'),
work: () =>
fillPool(
pool.run,
store.fetchAvailableTasks,
(instance: ConcreteTaskInstance) =>
new TaskManagerRunner({
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to the default client, and reordered the fields to be in the order that TaskManagerRunner defines. VS Code was giving a warning about this before & after I touched it

logger,
instance,
store,
contextProvider,
definition: definitions[instance.taskType],
})
),
});

poller.start();

const client = new TaskManager({ store, poller });
maxWorkers,
definitions
);
server.taskManager.setClient(client);
});
}
Expand Down
5 changes: 3 additions & 2 deletions src/server/task_manager/task_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ export class TaskManagerRunner implements TaskRunner {
try {
this.logger.debug(`Running task ${this}`);
const context = await this.contextProvider(this.instance);
this.promise = this.definition.run(context);
const taskRunner = this.definition.createTaskRunner(context);
this.promise = taskRunner.run();
return this.processResult(this.validateResult(await this.promise));
} catch (error) {
this.logger.warning(`Task ${this} failed ${error.stack}`);
Expand Down Expand Up @@ -190,7 +191,7 @@ export class TaskManagerRunner implements TaskRunner {
* @memberof TaskManagerRunner
*/
public async cancel() {
const promise: any = this.promise;
const promise: any = this.promise; // needs to be the stored taskrunner from `const taskRunner = this.definition.createTaskRunner(context)`

if (promise && promise.cancel) {
this.promise = undefined;
Expand Down
6 changes: 3 additions & 3 deletions src/server/task_manager/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export interface FetchResult {
}

// Internal, the raw document, as stored in the Kibana index.
interface RawTaskDoc {
export interface RawTaskDoc {
_id: string;
_index: string;
_type: string;
Expand Down Expand Up @@ -142,7 +142,7 @@ export class TaskStore {
*
* @param task - The task being scheduled.
*/
public schedule(task: TaskInstance) {
public schedule(task: TaskInstance): Promise<RawTaskDoc> {
return this.callCluster('index', {
index: this.index,
type: DOC_TYPE,
Expand Down Expand Up @@ -230,7 +230,7 @@ export class TaskStore {
* @param {string} id
* @returns {Promise<void>}
*/
public async remove(id: string): Promise<void> {
public async remove(id: string): Promise<RawTaskDoc> {
return this.callCluster('delete', {
id,
index: this.index,
Expand Down