From 1ef085df3dbd8e6f6ac6170403d83ca37e465de6 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Thu, 27 Jan 2022 13:31:33 -0500 Subject: [PATCH 1/8] Adding REMOVED_TYPES to task manager and only marking those types as unrecognized --- x-pack/plugins/task_manager/server/plugin.ts | 3 ++- .../task_manager/server/polling_lifecycle.ts | 3 +++ .../mark_available_tasks_as_claimed.ts | 27 +++++++++++++------ .../server/queries/task_claiming.ts | 22 ++++++++------- .../server/task_type_dictionary.ts | 5 ++++ 5 files changed, 42 insertions(+), 18 deletions(-) diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index bb4c461758f96..b58b0665c10c0 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -22,7 +22,7 @@ import { TaskManagerConfig } from './config'; import { createInitialMiddleware, addMiddlewareToChain, Middleware } from './lib/middleware'; import { removeIfExists } from './lib/remove_if_exists'; import { setupSavedObjects } from './saved_objects'; -import { TaskDefinitionRegistry, TaskTypeDictionary } from './task_type_dictionary'; +import { TaskDefinitionRegistry, TaskTypeDictionary, REMOVED_TYPES } from './task_type_dictionary'; import { FetchResult, SearchOpts, TaskStore } from './task_store'; import { createManagedConfiguration } from './lib/create_managed_configuration'; import { TaskScheduling } from './task_scheduling'; @@ -189,6 +189,7 @@ export class TaskManagerPlugin this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, + unusedTypes: REMOVED_TYPES, logger: this.logger, executionContext, taskStore, diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index b61891d732f5e..a452c8a3f82fb 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -50,6 +50,7 @@ import { TaskClaiming, ClaimOwnershipResult } from './queries/task_claiming'; export type TaskPollingLifecycleOpts = { logger: Logger; definitions: TaskTypeDictionary; + unusedTypes: string[]; taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; @@ -106,6 +107,7 @@ export class TaskPollingLifecycle { config, taskStore, definitions, + unusedTypes, executionContext, usageCounter, }: TaskPollingLifecycleOpts) { @@ -134,6 +136,7 @@ export class TaskPollingLifecycle { maxAttempts: config.max_attempts, excludedTaskTypes: config.unsafe.exclude_task_types, definitions, + unusedTypes, logger: this.logger, getCapacity: (taskType?: string) => taskType && this.definitions.get(taskType)?.maxConcurrency diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index b1ccb191bdce0..5f2aa25253b0c 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -104,15 +104,25 @@ if (doc['task.runAt'].size()!=0) { }; export const SortByRunAtAndRetryAt = SortByRunAtAndRetryAtScript as estypes.SortCombinations; -export const updateFieldsAndMarkAsFailed = ( +export interface UpdateFieldsAndMarkAsFailedOpts { fieldUpdates: { [field: string]: string | number | Date; - }, - claimTasksById: string[], - claimableTaskTypes: string[], - skippedTaskTypes: string[], - taskMaxAttempts: { [field: string]: number } -): ScriptClause => { + }; + claimTasksById: string[]; + claimableTaskTypes: string[]; + skippedTaskTypes: string[]; + unusedTaskTypes: string[]; + taskMaxAttempts: { [field: string]: number }; +} + +export const updateFieldsAndMarkAsFailed = ({ + fieldUpdates, + claimTasksById, + claimableTaskTypes, + skippedTaskTypes, + unusedTaskTypes, + taskMaxAttempts, +}: UpdateFieldsAndMarkAsFailedOpts): ScriptClause => { const markAsClaimingScript = `ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')}`; @@ -126,7 +136,7 @@ export const updateFieldsAndMarkAsFailed = ( } } else if (params.skippedTaskTypes.contains(ctx._source.task.taskType) && params.claimTasksById.contains(ctx._id)) { ${markAsClaimingScript} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -137,6 +147,7 @@ export const updateFieldsAndMarkAsFailed = ( claimTasksById, claimableTaskTypes, skippedTaskTypes, + unusedTaskTypes, taskMaxAttempts, }, }; diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.ts index b45591a233e19..1b4f0fdb73683 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.ts @@ -57,6 +57,7 @@ import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; export interface TaskClaimingOpts { logger: Logger; definitions: TaskTypeDictionary; + unusedTypes: string[]; taskStore: TaskStore; maxAttempts: number; excludedTaskTypes: string[]; @@ -121,6 +122,7 @@ export class TaskClaiming { private readonly taskClaimingBatchesByType: TaskClaimingBatches; private readonly taskMaxAttempts: Record; private readonly excludedTaskTypes: string[]; + private readonly unusedTypes: string[]; /** * Constructs a new TaskStore. @@ -137,6 +139,7 @@ export class TaskClaiming { this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions); this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions)); this.excludedTaskTypes = opts.excludedTaskTypes; + this.unusedTypes = opts.unusedTypes; this.events$ = new Subject(); } @@ -225,7 +228,7 @@ export class TaskClaiming { return of(accumulatedResult); } return from( - this.executClaimAvailableTasks({ + this.executeClaimAvailableTasks({ claimOwnershipUntil, claimTasksById: claimTasksById.splice(0, capacity), size: capacity, @@ -249,7 +252,7 @@ export class TaskClaiming { ); } - private executClaimAvailableTasks = async ({ + private executeClaimAvailableTasks = async ({ claimOwnershipUntil, claimTasksById = [], size, @@ -403,16 +406,17 @@ export class TaskClaiming { : queryForScheduledTasks, filterDownBy(InactiveTasks) ); - const script = updateFieldsAndMarkAsFailed( - { + const script = updateFieldsAndMarkAsFailed({ + fieldUpdates: { ownerId: this.taskStore.taskManagerId, retryAt: claimOwnershipUntil, }, - claimTasksById || [], - taskTypesToClaim, - taskTypesToSkip, - pick(this.taskMaxAttempts, taskTypesToClaim) - ); + claimTasksById: claimTasksById || [], + claimableTaskTypes: taskTypesToClaim, + skippedTaskTypes: taskTypesToSkip, + unusedTaskTypes: this.unusedTypes, + taskMaxAttempts: pick(this.taskMaxAttempts, taskTypesToClaim), + }); const apmTrans = apm.startTransaction( TASK_MANAGER_MARK_AS_CLAIMED, diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index 3bc60284efc8f..b51d7d33b470a 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -8,6 +8,11 @@ import { TaskDefinition, taskDefinitionSchema, TaskRunCreatorFunction } from './task'; import { Logger } from '../../../../src/core/server'; +/** + * Types that are no longer registered and will be marked as unregistered + */ +export const REMOVED_TYPES: string[] = [].sort(); + /** * Defines a task which can be scheduled and run by the Kibana * task manager. From 7dbccbc9f9d042b25e48d1caa757c40d9a82575a Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Fri, 28 Jan 2022 11:10:01 -0500 Subject: [PATCH 2/8] Adding unit tests --- .../server/polling_lifecycle.test.ts | 1 + .../mark_available_tasks_as_claimed.test.ts | 47 ++++--- .../server/queries/task_claiming.test.ts | 125 ++++++++++++++++++ 3 files changed, 158 insertions(+), 15 deletions(-) diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index b6a93b14f578b..cf29d1f475c6c 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -70,6 +70,7 @@ describe('TaskPollingLifecycle', () => { }, taskStore: mockTaskStore, logger: taskManagerLogger, + unusedTypes: [], definitions: new TaskTypeDictionary(taskManagerLogger), middleware: createInitialMiddleware(), maxWorkersConfiguration$: of(100), diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 9e31ab9f0cb4e..18ed1a5802538 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -47,15 +47,16 @@ describe('mark_available_tasks_as_claimed', () => { // status running or claiming with a retryAt <= now. shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt) ), - script: updateFieldsAndMarkAsFailed( + script: updateFieldsAndMarkAsFailed({ fieldUpdates, - claimTasksById || [], - definitions.getAllTypes(), - [], - Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => { + claimTasksById: claimTasksById || [], + claimableTaskTypes: definitions.getAllTypes(), + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: Array.from(definitions).reduce((accumulator, [type, { maxAttempts }]) => { return { ...accumulator, [type]: maxAttempts || defaultMaxAttempts }; - }, {}) - ), + }, {}), + }), sort: SortByRunAtAndRetryAt, }).toEqual({ query: { @@ -126,7 +127,7 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -140,6 +141,7 @@ if (doc['task.runAt'].size()!=0) { claimTasksById: [], claimableTaskTypes: ['sampleTask', 'otherTask'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { sampleTask: 5, otherTask: 1, @@ -164,9 +166,16 @@ if (doc['task.runAt'].size()!=0) { ]; expect( - updateFieldsAndMarkAsFailed(fieldUpdates, claimTasksById, ['foo', 'bar'], [], { - foo: 5, - bar: 2, + updateFieldsAndMarkAsFailed({ + fieldUpdates, + claimTasksById, + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: { + foo: 5, + bar: 2, + }, }) ).toMatchObject({ source: ` @@ -182,7 +191,7 @@ if (doc['task.runAt'].size()!=0) { ctx._source.task.status = "claiming"; ${Object.keys(fieldUpdates) .map((field) => `ctx._source.task.${field}=params.fieldUpdates.${field};`) .join(' ')} - } else if (!params.skippedTaskTypes.contains(ctx._source.task.taskType)) { + } else if (params.unusedTaskTypes.contains(ctx._source.task.taskType)) { ctx._source.task.status = "unrecognized"; } else { ctx.op = "noop"; @@ -196,6 +205,7 @@ if (doc['task.runAt'].size()!=0) { ], claimableTaskTypes: ['foo', 'bar'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { foo: 5, bar: 2, @@ -213,9 +223,16 @@ if (doc['task.runAt'].size()!=0) { }; expect( - updateFieldsAndMarkAsFailed(fieldUpdates, [], ['foo', 'bar'], [], { - foo: 5, - bar: 2, + updateFieldsAndMarkAsFailed({ + fieldUpdates, + claimTasksById: [], + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: [], + unusedTaskTypes: [], + taskMaxAttempts: { + foo: 5, + bar: 2, + }, }).source ).toMatch(/ctx.op = "noop"/); }); diff --git a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts index ed656b5144956..7b46f10adaabc 100644 --- a/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts +++ b/x-pack/plugins/task_manager/server/queries/task_claiming.test.ts @@ -109,6 +109,7 @@ describe('TaskClaiming', () => { logger: taskManagerLogger, definitions, excludedTaskTypes: [], + unusedTypes: [], taskStore: taskStoreMock.create({ taskManagerId: '' }), maxAttempts: 2, getCapacity: () => 10, @@ -127,12 +128,14 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], + unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; + unusedTaskTypes?: string[]; }) { const definitions = storeOpts.definitions ?? taskDefinitions; const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId }); @@ -161,6 +164,7 @@ describe('TaskClaiming', () => { definitions, taskStore: store, excludedTaskTypes, + unusedTypes: unusedTaskTypes, maxAttempts: taskClaimingOpts.maxAttempts ?? 2, getCapacity: taskClaimingOpts.getCapacity ?? (() => 10), ...taskClaimingOpts, @@ -176,6 +180,7 @@ describe('TaskClaiming', () => { hits = [generateFakeTasks(1)], versionConflicts = 2, excludedTaskTypes = [], + unusedTaskTypes = [], }: { storeOpts: Partial; taskClaimingOpts: Partial; @@ -183,12 +188,14 @@ describe('TaskClaiming', () => { hits?: ConcreteTaskInstance[][]; versionConflicts?: number; excludedTaskTypes?: string[]; + unusedTaskTypes?: string[]; }) { const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10); const { taskClaiming, store } = initialiseTestClaiming({ storeOpts, taskClaimingOpts, excludedTaskTypes, + unusedTaskTypes, hits, versionConflicts, }); @@ -496,6 +503,7 @@ if (doc['task.runAt'].size()!=0) { ], claimableTaskTypes: ['foo', 'bar'], skippedTaskTypes: [], + unusedTaskTypes: [], taskMaxAttempts: { bar: customMaxAttempts, foo: maxAttempts, @@ -614,6 +622,7 @@ if (doc['task.runAt'].size()!=0) { 'anotherLimitedToOne', 'limitedToTwo', ], + unusedTaskTypes: [], taskMaxAttempts: { unlimited: maxAttempts, }, @@ -871,6 +880,121 @@ if (doc['task.runAt'].size()!=0) { expect(firstCycle).not.toMatchObject(secondCycle); }); + test('it passes any unusedTaskTypes to script', async () => { + const maxAttempts = _.random(2, 43); + const customMaxAttempts = _.random(44, 100); + const taskManagerId = uuid.v1(); + const fieldUpdates = { + ownerId: taskManagerId, + retryAt: new Date(Date.now()), + }; + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + createTaskRunner: jest.fn(), + }, + bar: { + title: 'bar', + maxAttempts: customMaxAttempts, + createTaskRunner: jest.fn(), + }, + foobar: { + title: 'foobar', + maxAttempts: customMaxAttempts, + createTaskRunner: jest.fn(), + }, + }); + + const [ + { + args: { + updateByQuery: [{ query, script }], + }, + }, + ] = await testClaimAvailableTasks({ + storeOpts: { + definitions, + taskManagerId, + }, + taskClaimingOpts: { + maxAttempts, + }, + claimingOpts: { + claimOwnershipUntil: new Date(), + }, + excludedTaskTypes: ['foobar'], + unusedTaskTypes: ['barfoo'], + }); + expect(query).toMatchObject({ + bool: { + must: [ + { + bool: { + should: [ + { + bool: { + must: [ + { term: { 'task.status': 'idle' } }, + { range: { 'task.runAt': { lte: 'now' } } }, + ], + }, + }, + { + bool: { + must: [ + { + bool: { + should: [ + { term: { 'task.status': 'running' } }, + { term: { 'task.status': 'claiming' } }, + ], + }, + }, + { range: { 'task.retryAt': { lte: 'now' } } }, + ], + }, + }, + ], + }, + }, + ], + filter: [ + { + bool: { + must_not: [ + { + bool: { + should: [ + { term: { 'task.status': 'running' } }, + { term: { 'task.status': 'claiming' } }, + ], + must: { range: { 'task.retryAt': { gt: 'now' } } }, + }, + }, + ], + }, + }, + ], + }, + }); + expect(script).toMatchObject({ + source: expect.any(String), + lang: 'painless', + params: { + fieldUpdates, + claimTasksById: [], + claimableTaskTypes: ['foo', 'bar'], + skippedTaskTypes: ['foobar'], + unusedTaskTypes: ['barfoo'], + taskMaxAttempts: { + bar: customMaxAttempts, + foo: maxAttempts, + }, + }, + }); + }); + test('it claims tasks by setting their ownerId, status and retryAt', async () => { const taskManagerId = uuid.v1(); const claimOwnershipUntil = new Date(Date.now()); @@ -1263,6 +1387,7 @@ if (doc['task.runAt'].size()!=0) { logger: taskManagerLogger, definitions, excludedTaskTypes: [], + unusedTypes: [], taskStore, maxAttempts: 2, getCapacity, From 976a14ff76e837a17959a917b2680a616c477dee Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Fri, 28 Jan 2022 13:57:43 -0500 Subject: [PATCH 3/8] Fixing functional test --- .../server/task_type_dictionary.ts | 5 ++- .../task_manager_removed_types/data.json | 31 +++++++++++++++++++ .../task_management_removed_types.ts | 12 +++++-- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index b51d7d33b470a..79fa4b7d2b25e 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -11,7 +11,10 @@ import { Logger } from '../../../../src/core/server'; /** * Types that are no longer registered and will be marked as unregistered */ -export const REMOVED_TYPES: string[] = [].sort(); +export const REMOVED_TYPES: string[] = [ + // for testing + 'sampleTaskRemovedType', +]; /** * Defines a task which can be scheduled and run by the Kibana diff --git a/x-pack/test/functional/es_archives/task_manager_removed_types/data.json b/x-pack/test/functional/es_archives/task_manager_removed_types/data.json index 8594e9d567b8a..3fc1a2cad2d28 100644 --- a/x-pack/test/functional/es_archives/task_manager_removed_types/data.json +++ b/x-pack/test/functional/es_archives/task_manager_removed_types/data.json @@ -1,3 +1,34 @@ +{ + "type": "doc", + "value": { + "id": "task:ce7e1250-3322-11eb-94c1-db6995e83f6b", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.6.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"originalParams\":{},\"superFly\":\"My middleware param!\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "idle", + "taskType": "sampleTaskNotRegisteredType" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } +} + { "type": "doc", "value": { diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts index 61223b8b67e64..90590f1e3e572 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management_removed_types.ts @@ -45,9 +45,10 @@ export default function ({ getService }: FtrProviderContext) { const config = getService('config'); const request = supertest(url.format(config.get('servers.kibana'))); + const UNREGISTERED_TASK_TYPE_ID = 'ce7e1250-3322-11eb-94c1-db6995e83f6b'; const REMOVED_TASK_TYPE_ID = 'be7e1250-3322-11eb-94c1-db6995e83f6a'; - describe('removed task types', () => { + describe('not registered task types', () => { before(async () => { await esArchiver.load('x-pack/test/functional/es_archives/task_manager_removed_types'); }); @@ -76,7 +77,7 @@ export default function ({ getService }: FtrProviderContext) { .then((response) => response.body); } - it('should successfully schedule registered tasks and mark unregistered tasks as unrecognized', async () => { + it('should successfully schedule registered tasks, not claim unregistered tasks and mark removed task types as unrecognized', async () => { const scheduledTask = await scheduleTask({ taskType: 'sampleTask', schedule: { interval: `1s` }, @@ -85,16 +86,21 @@ export default function ({ getService }: FtrProviderContext) { await retry.try(async () => { const tasks = (await currentTasks()).docs; - expect(tasks.length).to.eql(2); + expect(tasks.length).to.eql(3); const taskIds = tasks.map((task) => task.id); expect(taskIds).to.contain(scheduledTask.id); + expect(taskIds).to.contain(UNREGISTERED_TASK_TYPE_ID); expect(taskIds).to.contain(REMOVED_TASK_TYPE_ID); const scheduledTaskInstance = tasks.find((task) => task.id === scheduledTask.id); + const unregisteredTaskInstance = tasks.find( + (task) => task.id === UNREGISTERED_TASK_TYPE_ID + ); const removedTaskInstance = tasks.find((task) => task.id === REMOVED_TASK_TYPE_ID); expect(scheduledTaskInstance?.status).to.eql('claiming'); + expect(unregisteredTaskInstance?.status).to.eql('idle'); expect(removedTaskInstance?.status).to.eql('unrecognized'); }); }); From 7a5cf7be676f4505aa2c84ba66fe8d2686b44839 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Mon, 31 Jan 2022 08:37:05 -0500 Subject: [PATCH 4/8] Throwing error when registering a removed task type --- .../server/task_type_dictionary.test.ts | 58 ++++++++++++++++++- .../server/task_type_dictionary.ts | 5 ++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts index d682d40a1d811..cb2f436fa8676 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.test.ts @@ -7,7 +7,12 @@ import { get } from 'lodash'; import { RunContext, TaskDefinition } from './task'; -import { sanitizeTaskDefinitions, TaskDefinitionRegistry } from './task_type_dictionary'; +import { mockLogger } from './test_utils'; +import { + sanitizeTaskDefinitions, + TaskDefinitionRegistry, + TaskTypeDictionary, +} from './task_type_dictionary'; interface Opts { numTasks: number; @@ -40,6 +45,12 @@ const getMockTaskDefinitions = (opts: Opts) => { }; describe('taskTypeDictionary', () => { + let definitions: TaskTypeDictionary; + + beforeEach(() => { + definitions = new TaskTypeDictionary(mockLogger()); + }); + describe('sanitizeTaskDefinitions', () => {}); it('provides tasks with defaults', () => { const taskDefinitions = getMockTaskDefinitions({ numTasks: 3 }); @@ -154,4 +165,49 @@ describe('taskTypeDictionary', () => { `"Invalid timeout \\"1.5h\\". Timeout must be of the form \\"{number}{cadance}\\" where number is an integer. Example: 5m."` ); }); + + describe('registerTaskDefinitions', () => { + it('registers a valid task', () => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + maxConcurrency: 2, + createTaskRunner: jest.fn(), + }, + }); + expect(definitions.has('foo')).toBe(true); + }); + + it('throws error when registering duplicate task type', () => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + maxConcurrency: 2, + createTaskRunner: jest.fn(), + }, + }); + + expect(() => { + definitions.registerTaskDefinitions({ + foo: { + title: 'foo2', + createTaskRunner: jest.fn(), + }, + }); + }).toThrowErrorMatchingInlineSnapshot(`"Task foo is already defined!"`); + }); + + it('throws error when registering removed task type', () => { + expect(() => { + definitions.registerTaskDefinitions({ + sampleTaskRemovedType: { + title: 'removed', + createTaskRunner: jest.fn(), + }, + }); + }).toThrowErrorMatchingInlineSnapshot( + `"Task sampleTaskRemovedType has been removed from registration!"` + ); + }); + }); }); diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index 79fa4b7d2b25e..fbb26bbb5ce5f 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -117,6 +117,11 @@ export class TaskTypeDictionary { throw new Error(`Task ${duplicate} is already defined!`); } + const removed = Object.keys(taskDefinitions).find((type) => REMOVED_TYPES.indexOf(type) >= 0); + if (removed) { + throw new Error(`Task ${removed} has been removed from registration!`); + } + try { for (const definition of sanitizeTaskDefinitions(taskDefinitions)) { this.definitions.set(definition.type, definition); From e126374310aa2ff25152b4fe14aa3e6e519e1b9a Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Mon, 31 Jan 2022 09:13:22 -0500 Subject: [PATCH 5/8] Adding migration --- .../server/saved_objects/migrations.test.ts | 58 +++++++++++++++++++ .../server/saved_objects/migrations.ts | 26 +++++++++ 2 files changed, 84 insertions(+) diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts index 73141479d9081..83046f7387b0d 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.test.ts @@ -108,6 +108,64 @@ describe('successful migrations', () => { }); }); }); + + describe('8.2.0', () => { + test('resets "unrecognized" status to "idle" when task type is not in REMOVED_TYPES list', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someValidTask', + status: 'unrecognized', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual({ + ...taskInstance, + attributes: { + ...taskInstance.attributes, + status: 'idle', + }, + }); + }); + + test('does not modify "unrecognized" status when task type is in REMOVED_TYPES list', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'sampleTaskRemovedType', + status: 'unrecognized', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "running"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'running', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "idle"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'idle', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + + test('does not modify document when status is "failed"', () => { + const migration820 = getMigrations()['8.2.0']; + const taskInstance = getMockData({ + taskType: 'someTask', + status: 'failed', + }); + + expect(migration820(taskInstance, migrationContext)).toEqual(taskInstance); + }); + }); }); describe('handles errors during migrations', () => { diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts index 89bbb3d783881..1d24c4f97175c 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { ConcreteTaskInstance } from '..'; import { LogMeta, SavedObjectMigrationContext, @@ -14,6 +15,7 @@ import { SavedObjectUnsanitizedDoc, } from '../../../../../src/core/server'; import { TaskInstance, TaskInstanceWithDeprecatedFields } from '../task'; +import { REMOVED_TYPES } from '../task_type_dictionary'; interface TaskInstanceLogMeta extends LogMeta { migrations: { taskInstanceDocument: SavedObjectUnsanitizedDoc }; @@ -37,6 +39,7 @@ export function getMigrations(): SavedObjectMigrationMap { pipeMigrations(alertingTaskLegacyIdToSavedObjectIds, actionsTasksLegacyIdToSavedObjectIds), '8.0.0' ), + '8.2.0': executeMigrationWithErrorHandling(resetUnrecognizedStatus, '8.2.0'), }; } @@ -142,6 +145,29 @@ function moveIntervalIntoSchedule({ }; } +function resetUnrecognizedStatus( + doc: SavedObjectUnsanitizedDoc +): SavedObjectUnsanitizedDoc { + const status = (doc as SavedObjectUnsanitizedDoc)?.attributes?.status; + if (status && status === 'unrecognized') { + const taskType = doc.attributes.taskType; + // If task type is in the REMOVED_TYPES list, maintain "unrecognized" status + if (REMOVED_TYPES.indexOf(taskType) >= 0) { + return doc; + } + + return { + ...doc, + attributes: { + ...doc.attributes, + status: 'idle', + }, + } as SavedObjectUnsanitizedDoc; + } + + return doc; +} + function pipeMigrations(...migrations: TaskInstanceMigration[]): TaskInstanceMigration { return (doc: SavedObjectUnsanitizedDoc) => migrations.reduce((migratedDoc, nextMigration) => nextMigration(migratedDoc), doc); From 821bc7d907ce88c42cac506cff8235c127abc092 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Mon, 31 Jan 2022 09:31:32 -0500 Subject: [PATCH 6/8] Adding functional tests --- .../es_archives/task_manager_tasks/data.json | 62 +++++++++++++++++++ .../test_suites/task_manager/migrations.ts | 37 ++++++++++- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/x-pack/test/functional/es_archives/task_manager_tasks/data.json b/x-pack/test/functional/es_archives/task_manager_tasks/data.json index b59abd341a7af..cc9dae07da439 100644 --- a/x-pack/test/functional/es_archives/task_manager_tasks/data.json +++ b/x-pack/test/functional/es_archives/task_manager_tasks/data.json @@ -58,4 +58,66 @@ "updated_at": "2020-11-30T15:43:08.277Z" } } +} + +{ + "type": "doc", + "value": { + "id": "task:ce7e1250-3322-11eb-94c1-db6995e84f6d", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"spaceId\":\"user1\",\"alertId\":\"0359d7fcc04da9878ee9aadbda38ba55\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "unrecognized", + "taskType": "alerting:0359d7fcc04da9878ee9aadbda38ba55" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } +} + +{ + "type": "doc", + "value": { + "id": "task:fe7e1250-3322-11eb-94c1-db6395e84f6e", + "index": ".kibana_task_manager_1", + "source": { + "migrationVersion": { + "task": "7.16.0" + }, + "references": [ + ], + "task": { + "attempts": 0, + "params": "{\"spaceId\":\"user1\",\"alertId\":\"0359d7fcc04da9878ee9aadbda38ba55\"}", + "retryAt": "2020-11-30T15:43:39.626Z", + "runAt": "2020-11-30T15:43:08.277Z", + "scheduledAt": "2020-11-30T15:43:08.277Z", + "scope": [ + "testing" + ], + "startedAt": null, + "state": "{}", + "status": "unrecognized", + "taskType": "sampleTaskRemovedType" + }, + "type": "task", + "updated_at": "2020-11-30T15:43:08.277Z" + } + } } \ No newline at end of file diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts index 329aee7e74b98..34d057ea60035 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/migrations.ts @@ -8,7 +8,10 @@ import expect from '@kbn/expect'; import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import type { TransportResult } from '@elastic/elasticsearch'; -import { TaskInstanceWithDeprecatedFields } from '../../../../plugins/task_manager/server/task'; +import { + ConcreteTaskInstance, + TaskInstanceWithDeprecatedFields, +} from '../../../../plugins/task_manager/server/task'; import { FtrProviderContext } from '../../../common/ftr_provider_context'; import { SavedObjectsUtils } from '../../../../../src/core/server/saved_objects'; @@ -76,5 +79,37 @@ export default function createGetTests({ getService }: FtrProviderContext) { )}"}` ); }); + + it('8.2.0 migrates tasks with unrecognized status to idle if task type is removed', async () => { + const response = await es.get<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + id: 'task:ce7e1250-3322-11eb-94c1-db6995e84f6d', + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + expect(response.body._source?.task.taskType).to.eql( + `alerting:0359d7fcc04da9878ee9aadbda38ba55` + ); + expect(response.body._source?.task.status).to.eql(`idle`); + }); + + it('8.2.0 does not migrate tasks with unrecognized status if task type is valid', async () => { + const response = await es.get<{ task: ConcreteTaskInstance }>( + { + index: '.kibana_task_manager', + id: 'task:fe7e1250-3322-11eb-94c1-db6395e84f6e', + }, + { + meta: true, + } + ); + expect(response.statusCode).to.eql(200); + expect(response.body._source?.task.taskType).to.eql(`sampleTaskRemovedType`); + expect(response.body._source?.task.status).to.eql(`unrecognized`); + }); }); } From bda2fdd0faa5dcef8ec0eadbfe48237ecc03772d Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Wed, 9 Feb 2022 16:34:01 -0500 Subject: [PATCH 7/8] Cleanup --- x-pack/plugins/task_manager/server/saved_objects/migrations.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts index c8e3177e144c4..6e527918f2a7e 100644 --- a/x-pack/plugins/task_manager/server/saved_objects/migrations.ts +++ b/x-pack/plugins/task_manager/server/saved_objects/migrations.ts @@ -5,7 +5,6 @@ * 2.0. */ -import { ConcreteTaskInstance } from '..'; import { LogMeta, SavedObjectMigrationContext, From 2dee057cdebd95eb8622a04ca13a4ca7426653e1 Mon Sep 17 00:00:00 2001 From: Ying Mao Date: Tue, 15 Feb 2022 08:57:43 -0500 Subject: [PATCH 8/8] Adding disabled siem signals rule type --- x-pack/plugins/task_manager/server/task_type_dictionary.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugins/task_manager/server/task_type_dictionary.ts b/x-pack/plugins/task_manager/server/task_type_dictionary.ts index fbb26bbb5ce5f..a2ea46122acf8 100644 --- a/x-pack/plugins/task_manager/server/task_type_dictionary.ts +++ b/x-pack/plugins/task_manager/server/task_type_dictionary.ts @@ -14,6 +14,9 @@ import { Logger } from '../../../../src/core/server'; export const REMOVED_TYPES: string[] = [ // for testing 'sampleTaskRemovedType', + + // deprecated in https://github.com/elastic/kibana/pull/121442 + 'alerting:siem.signals', ]; /**