diff --git a/x-pack/plugins/task_manager/constants.ts b/x-pack/plugins/task_manager/constants.ts new file mode 100644 index 0000000000000..3b671842d4c51 --- /dev/null +++ b/x-pack/plugins/task_manager/constants.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import xPackage from '../../package.json'; +import { getTemplateVersion } from './lib/get_template_version'; + +export const TASK_MANAGER_API_VERSION = 1; +export const TASK_MANAGER_TEMPLATE_VERSION = getTemplateVersion(xPackage.version); diff --git a/x-pack/plugins/task_manager/lib/get_template_version.test.ts b/x-pack/plugins/task_manager/lib/get_template_version.test.ts new file mode 100644 index 0000000000000..a1c83c8d2d391 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/get_template_version.test.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { getTemplateVersion } from './get_template_version'; + +describe('getTemplateVersion', () => { + it('converts a release build version string into an integer', () => { + const versionStr1 = '7.1.2'; + expect(getTemplateVersion(versionStr1)).toBe(7010299); + + const versionStr2 = '10.1.0'; + expect(getTemplateVersion(versionStr2)).toBe(10010099); + }); + + it('converts a alpha build version string into an integer', () => { + const versionStr1 = '7.0.0-alpha1'; + expect(getTemplateVersion(versionStr1)).toBe(7000001); + + const versionStr2 = '7.0.0-alpha3'; + expect(getTemplateVersion(versionStr2)).toBe(7000003); + }); + + it('converts a beta build version string into an integer', () => { + const versionStr1 = '7.0.0-beta4'; + expect(getTemplateVersion(versionStr1)).toBe(7000029); + + const versionStr5 = '7.0.0-beta8'; + expect(getTemplateVersion(versionStr5)).toBe(7000033); + }); + + it('converts a snapshot build version string into an integer', () => { + expect(getTemplateVersion('8.0.0-alpha1')).toBe(8000001); + expect(getTemplateVersion('8.0.0-alpha1-snapshot')).toBe(8000001); + }); + + it('not intended to handle any version parts with 3-digits: it will create malformed integer values', () => { + expect(getTemplateVersion('60.61.1') === getTemplateVersion('6.6.101')).toBe(true); // both produce 60610199 + expect(getTemplateVersion('1.32.0') < getTemplateVersion('1.3.223423')).toBe(true); + }); +}); diff --git a/x-pack/plugins/task_manager/lib/get_template_version.ts b/x-pack/plugins/task_manager/lib/get_template_version.ts new file mode 100644 index 0000000000000..eac9d09685a42 --- /dev/null +++ b/x-pack/plugins/task_manager/lib/get_template_version.ts @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { padLeft } from 'lodash'; + +/* + * The logic for ID is: XXYYZZAA, where XX is major version, YY is minor + * version, ZZ is revision, and AA is alpha/beta/rc indicator. + * + * AA values below 25 are for alpha builder (since 5.0), and above 25 and below + * 50 are beta builds, and below 99 are RC builds, with 99 indicating a release + * the (internal) format of the id is there so we can easily do after/before + * checks on the id + * + * Note: the conversion method is carried over from Elasticsearch: + * https://github.com/elastic/elasticsearch/blob/de962b2/server/src/main/java/org/elasticsearch/Version.java + */ +export function getTemplateVersion(versionStr: string): number { + // break up the string parts + const splitted = versionStr.split('.'); + const minorStr = splitted[2] || ''; + + // pad each part with leading 0 to make 2 characters + const padded = splitted.map((v: string) => { + const vMatches = v.match(/\d+/); + if (vMatches) { + return padLeft(vMatches[0], 2, '0'); + } + return '00'; + }); + const [majorV, minorV, patchV] = padded; + + // append the alpha/beta/rc indicator + let buildV; + if (minorStr.match('alpha')) { + const matches = minorStr.match(/alpha(?\d+)/); + if (matches != null && matches.groups != null) { + const alphaVerInt = parseInt(matches.groups.alpha, 10); // alpha build indicator + buildV = padLeft(`${alphaVerInt}`, 2, '0'); + } + } else if (minorStr.match('beta')) { + const matches = minorStr.match(/beta(?\d+)/); + if (matches != null && matches.groups != null) { + const alphaVerInt = parseInt(matches.groups.beta, 10) + 25; // beta build indicator + buildV = padLeft(`${alphaVerInt}`, 2, '0'); + } + } else { + buildV = '99'; // release build indicator + } + + const joinedParts = [majorV, minorV, patchV, buildV].join(''); + return parseInt(joinedParts, 10); +} diff --git a/x-pack/plugins/task_manager/lib/middleware.test.ts b/x-pack/plugins/task_manager/lib/middleware.test.ts index 5d81420a319ff..4add3a81501a1 100644 --- a/x-pack/plugins/task_manager/lib/middleware.test.ts +++ b/x-pack/plugins/task_manager/lib/middleware.test.ts @@ -25,6 +25,7 @@ const getMockConcreteTaskInstance = () => { attempts: number; status: TaskStatus; runAt: Date; + scheduledAt: Date; state: any; taskType: string; params: any; @@ -35,6 +36,7 @@ const getMockConcreteTaskInstance = () => { attempts: 0, status: 'idle', runAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), + scheduledAt: new Date(moment('2018-09-18T05:33:09.588Z').valueOf()), state: {}, taskType: 'nice_task', params: { abc: 'def' }, @@ -55,7 +57,7 @@ const defaultBeforeRun = async (opts: RunContext) => { }; describe('addMiddlewareToChain', () => { - it('chains the beforeSave functions', () => { + it('chains the beforeSave functions', async () => { const m1 = { beforeSave: async (opts: BeforeSaveOpts) => { Object.assign(opts.taskInstance.params, { m1: true }); @@ -82,8 +84,10 @@ describe('addMiddlewareToChain', () => { middlewareChain = addMiddlewareToChain(m1, m2); middlewareChain = addMiddlewareToChain(middlewareChain, m3); - middlewareChain.beforeSave({ taskInstance: getMockTaskInstance() }).then((saveOpts: any) => { - expect(saveOpts).toMatchInlineSnapshot(` + await middlewareChain + .beforeSave({ taskInstance: getMockTaskInstance() }) + .then((saveOpts: any) => { + expect(saveOpts).toMatchInlineSnapshot(` Object { "taskInstance": Object { "params": Object { @@ -97,10 +101,10 @@ Object { }, } `); - }); + }); }); - it('chains the beforeRun functions', () => { + it('chains the beforeRun functions', async () => { const m1 = { beforeSave: defaultBeforeSave, beforeRun: async (opts: RunContext) => { @@ -133,7 +137,7 @@ Object { middlewareChain = addMiddlewareToChain(m1, m2); middlewareChain = addMiddlewareToChain(middlewareChain, m3); - middlewareChain + await middlewareChain .beforeRun(getMockRunContext(getMockConcreteTaskInstance())) .then(contextOpts => { expect(contextOpts).toMatchInlineSnapshot(` @@ -150,6 +154,7 @@ Object { }, "primaryTerm": 1, "runAt": 2018-09-18T05:33:09.588Z, + "scheduledAt": 2018-09-18T05:33:09.588Z, "sequenceNumber": 1, "state": Object {}, "status": "idle", diff --git a/x-pack/plugins/task_manager/task.ts b/x-pack/plugins/task_manager/task.ts index 01c2a24e0351d..393c0a8fb28cb 100644 --- a/x-pack/plugins/task_manager/task.ts +++ b/x-pack/plugins/task_manager/task.ts @@ -158,6 +158,12 @@ export interface TaskInstance { */ taskType: string; + /** + * The date and time that this task was originally scheduled. This is used + * for convenience to task run functions, and for troubleshooting. + */ + scheduledAt?: Date; + /** * The date and time that this task is scheduled to be run. It is not * guaranteed to run at this time, but it is guaranteed not to run earlier @@ -215,6 +221,12 @@ export interface ConcreteTaskInstance extends TaskInstance { */ primaryTerm: number; + /** + * The date and time that this task was originally scheduled. This is used + * for convenience to task run functions, and for troubleshooting. + */ + scheduledAt: Date; + /** * The number of unsuccessful attempts since the last successful run. This * will be zeroed out after a successful run. diff --git a/x-pack/plugins/task_manager/task_manager.test.ts b/x-pack/plugins/task_manager/task_manager.test.ts index 1f5738a657a2b..58f3c6f50bc33 100644 --- a/x-pack/plugins/task_manager/task_manager.test.ts +++ b/x-pack/plugins/task_manager/task_manager.test.ts @@ -75,6 +75,9 @@ describe('TaskManager', () => { }); function testOpts() { + const callCluster = sinon.stub(); + callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); + const $test = { events: {} as any, afterPluginsInit: _.noop, @@ -100,7 +103,7 @@ describe('TaskManager', () => { plugins: { elasticsearch: { getCluster() { - return { callWithInternalUser: _.noop }; + return { callWithInternalUser: callCluster }; }, status: { on(eventName: string, callback: () => any) { diff --git a/x-pack/plugins/task_manager/task_manager.ts b/x-pack/plugins/task_manager/task_manager.ts index fae633331f4bf..cffa9029d4382 100644 --- a/x-pack/plugins/task_manager/task_manager.ts +++ b/x-pack/plugins/task_manager/task_manager.ts @@ -53,11 +53,15 @@ export class TaskManager { const logger = new TaskManagerLogger((...args: any[]) => server.log(...args)); + /* Kibana UUID needs to be pulled live (not cached), as it takes a long time + * to initialize, and can change after startup */ const store = new TaskStore({ callCluster: server.plugins.elasticsearch.getCluster('admin').callWithInternalUser, index: config.get('xpack.task_manager.index'), maxAttempts: config.get('xpack.task_manager.max_attempts'), supportedTypes: Object.keys(this.definitions), + logger, + getKibanaUuid: () => config.get('server.uuid'), }); const pool = new TaskPool({ logger, @@ -94,6 +98,7 @@ export class TaskManager { this.isInitialized = true; }) .catch((err: Error) => { + // FIXME: check the type of error to make sure it's actually an ES error logger.warning(err.message); // rety again to initialize store and poller, using the timing of diff --git a/x-pack/plugins/task_manager/task_poller.test.ts b/x-pack/plugins/task_manager/task_poller.test.ts index 1a581c7b9c037..23f604e861c14 100644 --- a/x-pack/plugins/task_manager/task_poller.test.ts +++ b/x-pack/plugins/task_manager/task_poller.test.ts @@ -14,9 +14,13 @@ let store: TaskStore; describe('TaskPoller', () => { beforeEach(() => { - const callCluster = sinon.spy(); + const callCluster = sinon.stub(); + callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); + const getKibanaUuid = sinon.stub().returns('kibana-123-uuid-test'); store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'tasky', maxAttempts: 2, supportedTypes: ['a', 'b', 'c'], diff --git a/x-pack/plugins/task_manager/task_runner.test.ts b/x-pack/plugins/task_manager/task_runner.test.ts index 10b65fbcd93d3..0909be3b5c471 100644 --- a/x-pack/plugins/task_manager/task_runner.test.ts +++ b/x-pack/plugins/task_manager/task_runner.test.ts @@ -233,6 +233,7 @@ describe('TaskManagerRunner', () => { sequenceNumber: 32, primaryTerm: 32, runAt: new Date(), + scheduledAt: new Date(), attempts: 0, params: {}, scope: ['reporting'], diff --git a/x-pack/plugins/task_manager/task_store.test.ts b/x-pack/plugins/task_manager/task_store.test.ts index 2e908f7b23527..91ed8bcad8a6a 100644 --- a/x-pack/plugins/task_manager/task_store.test.ts +++ b/x-pack/plugins/task_manager/task_store.test.ts @@ -6,15 +6,25 @@ import _ from 'lodash'; import sinon from 'sinon'; +import { + TASK_MANAGER_API_VERSION as API_VERSION, + TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION, +} from './constants'; import { TaskInstance, TaskStatus } from './task'; import { FetchOpts, TaskStore } from './task_store'; +import { mockLogger } from './test_utils'; + +const getKibanaUuid = sinon.stub().returns('kibana-uuid-123-test'); describe('TaskStore', () => { describe('init', () => { test('creates the task manager index', async () => { - const callCluster = sinon.spy(); + const callCluster = sinon.stub(); + callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'tasky', maxAttempts: 2, supportedTypes: ['a', 'b', 'c'], @@ -22,7 +32,7 @@ describe('TaskStore', () => { await store.init(); - sinon.assert.calledOnce(callCluster); + sinon.assert.calledTwice(callCluster); // store.init calls twice: once to check for existing template, once to put the template (if needed) sinon.assert.calledWithMatch(callCluster, 'indices.putTemplate', { body: { @@ -35,28 +45,65 @@ describe('TaskStore', () => { name: 'tasky', }); }); + + test('logs a warning if newer index template exists', async () => { + const callCluster = sinon.stub(); + callCluster + .withArgs('indices.getTemplate') + .returns(Promise.resolve({ tasky: { version: Infinity } })); + + const logger = { + info: sinon.spy(), + debug: sinon.spy(), + warning: sinon.spy(), + error: sinon.spy(), + }; + + const store = new TaskStore({ + callCluster, + getKibanaUuid, + logger, + index: 'tasky', + maxAttempts: 2, + supportedTypes: ['a', 'b', 'c'], + }); + + await store.init(); + const loggingCall = logger.warning.getCall(0); + expect(loggingCall.args[0]).toBe( + `This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (Infinity). ` + + `Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` + + `"kibana.apiVersion" <= ${API_VERSION} in the task metadata.` + ); + expect(logger.warning.calledOnce).toBe(true); + }); }); describe('schedule', () => { async function testSchedule(task: TaskInstance) { - const callCluster = sinon.spy(() => + const callCluster = sinon.stub(); + callCluster.withArgs('index').returns( Promise.resolve({ _id: 'testid', _seq_no: 3344, _primary_term: 3344, }) ); + callCluster.withArgs('indices.getTemplate').returns(Promise.resolve({ tasky: {} })); const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'tasky', maxAttempts: 2, supportedTypes: ['report', 'dernstraight', 'yawn'], }); + await store.init(); const result = await store.schedule(task); - sinon.assert.calledTwice(callCluster); + sinon.assert.calledThrice(callCluster); - return { result, callCluster, arg: callCluster.args[1][1] }; + return { result, callCluster, arg: callCluster.args[2][1] }; } test('serializes the params and state', async () => { @@ -80,7 +127,7 @@ describe('TaskStore', () => { }); }); - test('retiurns a concrete task instance', async () => { + test('returns a concrete task instance', async () => { const task = { params: { hello: 'world' }, state: { foo: 'bar' }, @@ -120,6 +167,8 @@ describe('TaskStore', () => { const callCluster = sinon.spy(async () => ({ hits: { hits } })); const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'tasky', maxAttempts: 2, supportedTypes: ['a', 'b', 'c'], @@ -285,6 +334,7 @@ describe('TaskStore', () => { const callCluster = sinon.spy(async () => ({ hits: { hits } })); const store = new TaskStore({ callCluster, + logger: mockLogger(), supportedTypes: ['a', 'b', 'c'], index: 'tasky', maxAttempts: 2, @@ -306,6 +356,8 @@ describe('TaskStore', () => { const callCluster = sinon.spy(async () => ({ hits: { hits: [] } })); const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), supportedTypes: ['a', 'b', 'c'], index: 'tasky', maxAttempts: 2, @@ -341,6 +393,7 @@ describe('TaskStore', () => { { terms: { 'task.taskType': ['foo', 'bar'] } }, { range: { 'task.attempts': { lte: maxAttempts } } }, { range: { 'task.runAt': { lte: 'now' } } }, + { range: { 'kibana.apiVersion': { lte: 1 } } }, ], }, }, @@ -435,6 +488,7 @@ describe('TaskStore', () => { const runAt = new Date(); const task = { runAt, + scheduledAt: runAt, id: 'task:324242', params: { hello: 'world' }, state: { foo: 'bar' }, @@ -452,6 +506,8 @@ describe('TaskStore', () => { const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'tasky', maxAttempts: 2, supportedTypes: ['a', 'b', 'c'], @@ -501,6 +557,8 @@ describe('TaskStore', () => { ); const store = new TaskStore({ callCluster, + getKibanaUuid, + logger: mockLogger(), index: 'myindex', maxAttempts: 2, supportedTypes: ['a'], diff --git a/x-pack/plugins/task_manager/task_store.ts b/x-pack/plugins/task_manager/task_store.ts index fd2b9c1c720aa..a6e8c32c39872 100644 --- a/x-pack/plugins/task_manager/task_store.ts +++ b/x-pack/plugins/task_manager/task_store.ts @@ -8,15 +8,22 @@ * This module contains helpers for managing the task manager storage layer. */ +import { + TASK_MANAGER_API_VERSION as API_VERSION, + TASK_MANAGER_TEMPLATE_VERSION as TEMPLATE_VERSION, +} from './constants'; +import { Logger } from './lib/logger'; import { ConcreteTaskInstance, ElasticJs, TaskInstance, TaskStatus } from './task'; const DOC_TYPE = '_doc'; export interface StoreOpts { callCluster: ElasticJs; + getKibanaUuid: () => string; index: string; maxAttempts: number; supportedTypes: string[]; + logger: Logger; } export interface FetchOpts { @@ -46,8 +53,14 @@ export interface RawTaskDoc { _primary_term: number; _source: { type: string; + kibana: { + uuid: string; + version: number; + apiVersion: number; + }; task: { taskType: string; + scheduledAt: Date; runAt: Date; interval?: string; attempts: number; @@ -66,10 +79,12 @@ export interface RawTaskDoc { */ export class TaskStore { public readonly maxAttempts: number; + public getKibanaUuid: () => string; + public readonly index: string; private callCluster: ElasticJs; - private index: string; private supportedTypes: string[]; private _isInitialized = false; // tslint:disable-line:variable-name + private logger: Logger; /** * Constructs a new TaskStore. @@ -78,12 +93,15 @@ export class TaskStore { * @prop {string} index - The name of the task manager index * @prop {number} maxAttempts - The maximum number of attempts before a task will be abandoned * @prop {string[]} supportedTypes - The task types supported by this store + * @prop {Logger} logger - The task manager logger. */ constructor(opts: StoreOpts) { this.callCluster = opts.callCluster; this.index = opts.index; this.maxAttempts = opts.maxAttempts; this.supportedTypes = opts.supportedTypes; + this.logger = opts.logger; + this.getKibanaUuid = opts.getKibanaUuid; this.fetchAvailableTasks = this.fetchAvailableTasks.bind(this); } @@ -97,54 +115,108 @@ export class TaskStore { } /** - * Initializes the store, ensuring the task manager index is created and up to date. + * Initializes the store, ensuring the task manager index template is created + * and the version is up to date. */ public async init() { if (this._isInitialized) { throw new Error('TaskStore has already been initialized!'); } - const properties = { - type: { type: 'keyword' }, - task: { - properties: { - taskType: { type: 'keyword' }, - runAt: { type: 'date' }, - interval: { type: 'text' }, - attempts: { type: 'integer' }, - status: { type: 'keyword' }, - params: { type: 'text' }, - state: { type: 'text' }, - user: { type: 'keyword' }, - scope: { type: 'keyword' }, - }, - }, - }; + let existingVersion = -Infinity; + const templateName = this.index; try { - const templateResult = await this.callCluster('indices.putTemplate', { - name: this.index, - body: { - index_patterns: [this.index], - mappings: { - [DOC_TYPE]: { - dynamic: 'strict', - properties, - }, - }, - settings: { - number_of_shards: 1, - auto_expand_replicas: '0-1', - }, - }, + // check if template exists + const templateCheck = await this.callCluster('indices.getTemplate', { + name: templateName, + filter_path: '*.version', }); - this._isInitialized = true; - return templateResult; + // extract the existing version + const template = templateCheck[templateName] || {}; + existingVersion = template.version || 0; } catch (err) { - throw err; + if (err.statusCode !== 404) { + throw err; // ignore not found + } + } + + if (existingVersion > TEMPLATE_VERSION) { + // Do not trample a newer version template + this.logger.warning( + `This Kibana instance defines an older template version (${TEMPLATE_VERSION}) than is currently in Elasticsearch (${existingVersion}). ` + + `Because of the potential for non-backwards compatible changes, this Kibana instance will only be able to claim scheduled tasks with ` + + `"kibana.apiVersion" <= ${API_VERSION} in the task metadata.` + ); + return; + } else if (existingVersion === TEMPLATE_VERSION) { + // The latest template is already saved, so just log a debug line. + this.logger.debug( + `Not installing ${this.index} index template: version ${TEMPLATE_VERSION} already exists.` + ); + return; } - return; + // Activate template creation / update + if (existingVersion > 0) { + this.logger.info( + `Upgrading ${ + this.index + } index template. Old version: ${existingVersion}, New version: ${TEMPLATE_VERSION}.` + ); + } else { + this.logger.info(`Installing ${this.index} index template version: ${TEMPLATE_VERSION}.`); + } + + const templateResult = await this.callCluster('indices.putTemplate', { + name: templateName, + body: { + index_patterns: [this.index], + mappings: { + [DOC_TYPE]: { + dynamic: false, + properties: { + type: { type: 'keyword' }, + task: { + properties: { + taskType: { type: 'keyword' }, + scheduledAt: { type: 'date' }, + runAt: { type: 'date' }, + interval: { type: 'text' }, + attempts: { type: 'integer' }, + status: { type: 'keyword' }, + params: { type: 'text' }, + state: { type: 'text' }, + user: { type: 'keyword' }, + scope: { type: 'keyword' }, + }, + }, + kibana: { + properties: { + apiVersion: { type: 'integer' }, // 1, 2, 3, etc + uuid: { type: 'keyword' }, // + version: { type: 'integer' }, // 7000099, etc + }, + }, + }, + }, + }, + settings: { + number_of_shards: 1, + auto_expand_replicas: '0-1', + }, + version: TEMPLATE_VERSION, + }, + }); + + this._isInitialized = true; + this.logger.info( + `Installed ${ + this.index + } index template: version ${TEMPLATE_VERSION} (API version ${API_VERSION})` + ); + + return templateResult; } get isInitialized() { @@ -169,7 +241,7 @@ export class TaskStore { ); } - const { id, ...body } = rawSource(taskInstance); + const { id, ...body } = rawSource(taskInstance, this); const result = await this.callCluster('index', { id, body, @@ -185,6 +257,7 @@ export class TaskStore { primaryTerm: result._primary_term, attempts: 0, status: task.status, + scheduledAt: task.scheduledAt, runAt: task.runAt, state: taskInstance.state || {}, }; @@ -223,6 +296,7 @@ export class TaskStore { { terms: { 'task.taskType': this.supportedTypes } }, { range: { 'task.attempts': { lte: this.maxAttempts } } }, { range: { 'task.runAt': { lte: 'now' } } }, + { range: { 'kibana.apiVersion': { lte: API_VERSION } } }, ], }, }, @@ -242,7 +316,7 @@ export class TaskStore { * @returns {Promise} */ public async update(doc: ConcreteTaskInstance): Promise { - const rawDoc = taskDocToRaw(doc, this.index); + const rawDoc = taskDocToRaw(doc, this); const result = await this.callCluster('update', { body: { @@ -327,13 +401,14 @@ function paginatableSort(sort: any[] = []) { return [...sort, sortById]; } -function rawSource(doc: TaskInstance) { +function rawSource(doc: TaskInstance, store: TaskStore) { const { id, ...taskFields } = doc; const source = { ...taskFields, params: JSON.stringify(doc.params || {}), state: JSON.stringify(doc.state || {}), attempts: (doc as ConcreteTaskInstance).attempts || 0, + scheduledAt: doc.scheduledAt || new Date(), runAt: doc.runAt || new Date(), status: (doc as ConcreteTaskInstance).status || 'idle', }; @@ -347,16 +422,21 @@ function rawSource(doc: TaskInstance) { id, type: 'task', task: source, + kibana: { + uuid: store.getKibanaUuid(), // needs to be pulled live + version: TEMPLATE_VERSION, + apiVersion: API_VERSION, + }, }; } -function taskDocToRaw(doc: ConcreteTaskInstance, index: string): RawTaskDoc { - const { type, task } = rawSource(doc); +function taskDocToRaw(doc: ConcreteTaskInstance, store: TaskStore): RawTaskDoc { + const { type, task, kibana } = rawSource(doc, store); return { _id: doc.id, - _index: index, - _source: { type, task }, + _index: store.index, + _source: { type, task, kibana }, _seq_no: doc.sequenceNumber, _primary_term: doc.primaryTerm, };