diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts index 103bc17004ef0..76df8b7ae5584 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.test.ts @@ -18,8 +18,11 @@ import { RecognizedTask, OneOfTaskTypes, tasksWithPartitions, + claimSort, } from './mark_available_tasks_as_claimed'; +import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task'; + import { TaskTypeDictionary } from '../task_type_dictionary'; import { mockLogger } from '../test_utils'; @@ -304,4 +307,129 @@ if (doc['task.runAt'].size()!=0) { } `); }); + + // Tests sorting 3 tasks with different priorities, runAt/retryAt values + // running the sort over all permutations of them. + describe('claimSort', () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + normalPriorityTask: { + title: 'normal priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: TaskPriority.Normal, // 50 + }, + noPriorityTask: { + title: 'no priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: undefined, // 50 + }, + lowPriorityTask: { + title: 'low priority', + createTaskRunner: () => ({ run: () => Promise.resolve() }), + priority: TaskPriority.Low, // 1 + }, + }); + + // possible ordering of tasks before sort + const permutations = [ + [0, 1, 2], + [0, 2, 1], + [1, 0, 2], + [1, 2, 0], + [2, 0, 1], + [2, 1, 0], + ]; + + test('works correctly with same dates, different priorities', () => { + const date = new Date(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push(buildTaskInstance({ taskType: 'lowPriorityTask', runAt: date })); + baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: date })); + baseTasks.push(buildTaskInstance({ taskType: 'normalPriorityTask', runAt: date })); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + // all we know is low should be last + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + + test('works correctly with same priorities, different dates', () => { + const baseDate = new Date('2024-07-29T00:00:00Z').valueOf(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) }) + ); + baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate) })); + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) }) + ); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + expect(sorted[0]).toBe(baseTasks[2]); + expect(sorted[1]).toBe(baseTasks[1]); + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + + test('works correctly with mixed of runAt and retryAt values', () => { + const baseDate = new Date('2024-07-29T00:00:00Z').valueOf(); + const baseTasks: ConcreteTaskInstance[] = []; + + // push in reverse order + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) }) + ); + baseTasks.push( + buildTaskInstance({ + taskType: 'noPriorityTask', + runAt: new Date(baseDate - 2000), + retryAt: new Date(baseDate), // should use this value + }) + ); + baseTasks.push( + buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) }) + ); + + for (const perm of permutations) { + const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]]; + const sorted = claimSort(definitions, tasks); + expect(sorted[0]).toBe(baseTasks[2]); + expect(sorted[1]).toBe(baseTasks[1]); + expect(sorted[2]).toBe(baseTasks[0]); + } + }); + }); }); + +interface BuildTaskOpts { + taskType: string; + runAt: Date; + retryAt?: Date; +} + +let id = 1; + +function buildTaskInstance(opts: BuildTaskOpts): ConcreteTaskInstance { + const { taskType, runAt, retryAt } = opts; + return { + taskType, + id: `${id++}`, + runAt, + retryAt: retryAt || null, + scheduledAt: runAt, + attempts: 0, + status: TaskStatus.Idle, + startedAt: null, + state: {}, + params: {}, + ownerId: null, + }; +} diff --git a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts index 107a3f4466637..4e138545aec25 100644 --- a/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts +++ b/x-pack/plugins/task_manager/server/queries/mark_available_tasks_as_claimed.ts @@ -6,7 +6,7 @@ */ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; import { TaskTypeDictionary } from '../task_type_dictionary'; -import { TaskStatus, TaskPriority } from '../task'; +import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task'; import { ScriptBasedSortClause, ScriptClause, @@ -15,23 +15,6 @@ import { MustNotCondition, } from './query_clauses'; -export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition { - return { - bool: { - must: [ - { term: { 'task.taskType': type } }, - { - range: { - 'task.attempts': { - lt: maxAttempts, - }, - }, - }, - ], - }, - }; -} - export function tasksOfType(taskTypes: string[]): estypes.QueryDslQueryContainer { return { bool: { @@ -166,12 +149,53 @@ function getSortByPriority(definitions: TaskTypeDictionary): estypes.SortCombina }; } +// getClaimSort() is used to generate sort bits for the ES query +// should align with claimSort() below export function getClaimSort(definitions: TaskTypeDictionary): estypes.SortCombinations[] { const sortByPriority = getSortByPriority(definitions); if (!sortByPriority) return [SortByRunAtAndRetryAt]; return [sortByPriority, SortByRunAtAndRetryAt]; } +// claimSort() is used to sort tasks returned from a claimer by priority and date. +// Kept here so it should align with getClaimSort() above. +// Returns a copy of the tasks passed in. +export function claimSort( + definitions: TaskTypeDictionary, + tasks: ConcreteTaskInstance[] +): ConcreteTaskInstance[] { + const priorityMap: Record = {}; + tasks.forEach((task) => { + const taskType = task.taskType; + const priority = getPriority(definitions, taskType); + priorityMap[taskType] = priority; + }); + + return tasks.slice().sort(compare); + + function compare(a: ConcreteTaskInstance, b: ConcreteTaskInstance) { + // sort by priority, descending + const priorityA = priorityMap[a.taskType] ?? TaskPriority.Normal; + const priorityB = priorityMap[b.taskType] ?? TaskPriority.Normal; + + if (priorityA > priorityB) return -1; + if (priorityA < priorityB) return 1; + + // then sort by retry/runAt, ascending + const runA = a.retryAt?.valueOf() ?? a.runAt.valueOf() ?? 0; + const runB = b.retryAt?.valueOf() ?? b.runAt.valueOf() ?? 0; + + if (runA < runB) return -1; + if (runA > runB) return 1; + + return 0; + } +} + +function getPriority(definitions: TaskTypeDictionary, taskType: string): TaskPriority { + return definitions.get(taskType)?.priority ?? TaskPriority.Normal; +} + export interface UpdateFieldsAndMarkAsFailedOpts { fieldUpdates: { [field: string]: string | number | Date; diff --git a/x-pack/plugins/task_manager/server/task_claimers/index.ts b/x-pack/plugins/task_manager/server/task_claimers/index.ts index ff4f9f6131120..fdbe9e94aa6a9 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/index.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/index.ts @@ -83,3 +83,12 @@ export function isTaskTypeExcluded(excludedTaskTypePatterns: string[], taskType: return false; } + +export function getExcludedTaskTypes( + definitions: TaskTypeDictionary, + excludedTaskTypePatterns: string[] +) { + return definitions + .getAllTypes() + .filter((taskType) => isTaskTypeExcluded(excludedTaskTypePatterns, taskType)); +} diff --git a/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts b/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts new file mode 100644 index 0000000000000..531357436c0bf --- /dev/null +++ b/x-pack/plugins/task_manager/server/task_claimers/lib/task_selector_by_capacity.ts @@ -0,0 +1,40 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ConcreteTaskInstance } from '../../task'; +import { isLimited, TaskClaimingBatches } from '../../queries/task_claiming'; + +// given a list of tasks and capacity info, select the tasks that meet capacity +export function selectTasksByCapacity( + tasks: ConcreteTaskInstance[], + batches: TaskClaimingBatches +): ConcreteTaskInstance[] { + // create a map of task type - concurrency + const limitedBatches = batches.filter(isLimited); + const limitedMap = new Map(); + for (const limitedBatch of limitedBatches) { + const { tasksTypes, concurrency } = limitedBatch; + limitedMap.set(tasksTypes, concurrency); + } + + // apply the limited concurrency + const result: ConcreteTaskInstance[] = []; + for (const task of tasks) { + const concurrency = limitedMap.get(task.taskType); + if (concurrency == null) { + result.push(task); + continue; + } + + if (concurrency > 0) { + result.push(task); + limitedMap.set(task.taskType, concurrency - 1); + } + } + + return result; +} diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts index 959fa3468b238..4e47581ccbdd5 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.test.ts @@ -8,6 +8,8 @@ import _ from 'lodash'; import { v4 as uuidv4 } from 'uuid'; import { filter, take, toArray } from 'rxjs'; +import { SavedObjectsErrorHelpers } from '@kbn/core/server'; + import { CLAIM_STRATEGY_MGET, DEFAULT_KIBANAS_PER_PARTITION } from '../config'; import { @@ -34,7 +36,6 @@ import apm from 'elastic-apm-node'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; import { ClaimOwnershipResult } from '.'; import { FillPoolResult } from '../lib/fill_pool'; -import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { TaskPartitioner } from '../lib/task_partitioner'; import type { MustNotCondition } from '../queries/query_clauses'; import { @@ -166,12 +167,12 @@ describe('TaskClaiming', () => { } for (let i = 0; i < hits.length; i++) { - store.fetch.mockResolvedValueOnce({ docs: hits[i], versionMap: versionMaps[i] }); - store.getDocVersions.mockResolvedValueOnce(docVersion[i]); - const oneBulkGetResult = hits[i].map((hit) => asOk(hit)); - store.bulkGet.mockResolvedValueOnce(oneBulkGetResult); + store.msearch.mockResolvedValueOnce({ docs: hits[i], versionMap: versionMaps[i] }); + store.getDocVersions.mockResolvedValueOnce(versionMaps[i]); const oneBulkResult = hits[i].map((hit) => asOk(hit)); store.bulkUpdate.mockResolvedValueOnce(oneBulkResult); + const oneBulkGetResult = hits[i].map((hit) => asOk(hit)); + store.bulkGet.mockResolvedValueOnce(oneBulkGetResult); } const taskClaiming = new TaskClaiming({ @@ -235,12 +236,12 @@ describe('TaskClaiming', () => { ); expect(mockApmTrans.end).toHaveBeenCalledWith('success'); - expect(store.fetch.mock.calls).toMatchObject({}); + expect(store.msearch.mock.calls).toMatchObject({}); expect(store.getDocVersions.mock.calls).toMatchObject({}); return results.map((result, index) => ({ result, args: { - search: store.fetch.mock.calls[index][0] as SearchOpts & { + search: store.msearch.mock.calls[index][0] as SearchOpts[] & { query: MustNotCondition; }, }, @@ -273,8 +274,8 @@ describe('TaskClaiming', () => { }, }); - store.fetch.mockReset(); - store.fetch.mockRejectedValue(new Error('Oh no')); + store.msearch.mockReset(); + store.msearch.mockRejectedValue(new Error('Oh no')); await expect( getAllAsPromise( @@ -337,7 +338,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce( @@ -380,7 +381,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -435,7 +439,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); @@ -475,7 +479,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(2); expect(store.bulkUpdate).toHaveBeenNthCalledWith( @@ -526,7 +533,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); @@ -578,7 +585,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(2); expect(store.bulkUpdate).toHaveBeenNthCalledWith( @@ -629,7 +639,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[2]].map(asOk)); @@ -673,7 +683,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkGet).toHaveBeenCalledWith(['id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(2); @@ -720,7 +733,7 @@ describe('TaskClaiming', () => { const fetchedTasks: ConcreteTaskInstance[] = []; const { versionMap } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, @@ -752,7 +765,10 @@ describe('TaskClaiming', () => { expect(taskManagerLogger.debug).not.toHaveBeenCalled(); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).not.toHaveBeenCalled(); expect(store.bulkGet).not.toHaveBeenCalled(); expect(store.bulkUpdate).not.toHaveBeenCalled(); @@ -777,7 +793,7 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); versionMap.delete('id-1'); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); @@ -816,7 +832,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(1); expect(store.bulkUpdate).toHaveBeenCalledWith( @@ -859,7 +878,7 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); docLatestVersions.delete('task:id-1'); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); @@ -898,7 +917,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(1); expect(store.bulkUpdate).toHaveBeenCalledWith( @@ -941,7 +963,7 @@ describe('TaskClaiming', () => { const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); docLatestVersions.set('task:id-1', { esId: 'task:id-1', seqNo: 33, primaryTerm: 33 }); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce([fetchedTasks[1], fetchedTasks[2]].map(asOk)); @@ -980,7 +1002,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith(['task:id-1', 'task:id-2', 'task:id-3']); expect(store.bulkUpdate).toHaveBeenCalledTimes(1); expect(store.bulkUpdate).toHaveBeenCalledWith( @@ -1025,7 +1050,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkGet.mockResolvedValueOnce( @@ -1068,7 +1093,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -1130,7 +1158,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkUpdate.mockResolvedValueOnce( [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(asOk) @@ -1184,7 +1212,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -1244,7 +1275,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkUpdate.mockResolvedValueOnce( [fetchedTasks[0], fetchedTasks[1], fetchedTasks[2], fetchedTasks[3]].map(asOk) @@ -1288,7 +1319,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -1348,7 +1382,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkUpdate.mockResolvedValueOnce([ asOk(fetchedTasks[0]), @@ -1404,7 +1438,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -1464,7 +1501,7 @@ describe('TaskClaiming', () => { ]; const { versionMap, docLatestVersions } = getVersionMapsFromTasks(fetchedTasks); - store.fetch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); + store.msearch.mockResolvedValueOnce({ docs: fetchedTasks, versionMap }); store.getDocVersions.mockResolvedValueOnce(docLatestVersions); store.bulkUpdate.mockRejectedValueOnce(new Error('oh no')); store.bulkGet.mockResolvedValueOnce([]); @@ -1506,7 +1543,10 @@ describe('TaskClaiming', () => { { tags: ['claimAvailableTasksMget'] } ); - expect(store.fetch.mock.calls[0][0]).toMatchObject({ size: 40, seq_no_primary_term: true }); + expect(store.msearch.mock.calls[0][0]?.[0]).toMatchObject({ + size: 40, + seq_no_primary_term: true, + }); expect(store.getDocVersions).toHaveBeenCalledWith([ 'task:id-1', 'task:id-2', @@ -1567,13 +1607,7 @@ describe('TaskClaiming', () => { createTaskRunner: jest.fn(), }, }); - const [ - { - args: { - search: { query }, - }, - }, - ] = await testClaimAvailableTasks({ + const claimedResults = await testClaimAvailableTasks({ storeOpts: { taskManagerId, definitions, @@ -1583,6 +1617,13 @@ describe('TaskClaiming', () => { claimOwnershipUntil: new Date(), }, }); + const [ + { + args: { + search: [{ query }], + }, + }, + ] = claimedResults; expect(query).toMatchInlineSnapshot(` Object { @@ -1786,24 +1827,31 @@ describe('TaskClaiming', () => { const taskStore = taskStoreMock.create({ taskManagerId }); taskStore.convertToSavedObjectIds.mockImplementation((ids) => ids.map((id) => `task:${id}`)); for (const docs of taskCycles) { - taskStore.fetch.mockResolvedValueOnce({ docs, versionMap: new Map() }); - taskStore.updateByQuery.mockResolvedValueOnce({ - updated: docs.length, - version_conflicts: 0, - total: docs.length, + const versionMap = new Map(); + const docVersions = new Map(); + for (const doc of docs) { + const esId = `task:${doc.id}`; + versionMap.set(doc.id, { esId, seqNo: 42, primaryTerm: 666 }); + docVersions.set(esId, { esId, seqNo: 42, primaryTerm: 666 }); + } + taskStore.msearch.mockResolvedValueOnce({ docs, versionMap }); + taskStore.getDocVersions.mockResolvedValueOnce(docVersions); + const updatedDocs = docs.map((doc) => { + doc = { ...doc, retryAt: null }; + return asOk(doc); }); + taskStore.bulkUpdate.mockResolvedValueOnce(updatedDocs); + taskStore.bulkGet.mockResolvedValueOnce(updatedDocs); } - taskStore.fetch.mockResolvedValue({ docs: [], versionMap: new Map() }); - taskStore.updateByQuery.mockResolvedValue({ - updated: 0, - version_conflicts: 0, - total: 0, - }); + taskStore.msearch.mockResolvedValue({ docs: [], versionMap: new Map() }); + taskStore.getDocVersions.mockResolvedValue(new Map()); + taskStore.bulkUpdate.mockResolvedValue([]); + taskStore.bulkGet.mockResolvedValue([]); const taskClaiming = new TaskClaiming({ logger: taskManagerLogger, - strategy: 'default', + strategy: CLAIM_STRATEGY_MGET, definitions, excludedTaskTypes: [], unusedTypes: [], @@ -1817,7 +1865,21 @@ describe('TaskClaiming', () => { } test('emits an event when a task is succesfully by scheduling', async () => { - const { taskManagerId, runAt, taskClaiming } = instantiateStoreWithMockedApiResponses(); + const taskDefs = new TaskTypeDictionary(taskManagerLogger); + taskDefs.registerTaskDefinitions({ + foo: { + title: 'foo', + createTaskRunner: jest.fn(), + }, + bar: { + title: 'bar', + createTaskRunner: jest.fn(), + }, + }); + + const { taskManagerId, runAt, taskClaiming } = instantiateStoreWithMockedApiResponses({ + definitions: taskDefs, + }); const promise = taskClaiming.events .pipe( @@ -1854,7 +1916,8 @@ describe('TaskClaiming', () => { retryAt: null, scheduledAt: new Date(), traceparent: 'newParent', - }) + }), + event?.timing ) ); }); diff --git a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts index b2751803e8dc3..dce4bf66e57db 100644 --- a/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts +++ b/x-pack/plugins/task_manager/server/task_claimers/strategy_mget.ts @@ -23,15 +23,11 @@ import { TaskClaimerOpts, ClaimOwnershipResult, getEmptyClaimOwnershipResult, - isTaskTypeExcluded, + getExcludedTaskTypes, } from '.'; import { ConcreteTaskInstance, TaskStatus, ConcreteTaskInstanceVersion, TaskCost } from '../task'; import { TASK_MANAGER_TRANSACTION_TYPE } from '../task_running'; -import { - isLimited, - TASK_MANAGER_MARK_AS_CLAIMED, - TaskClaimingBatches, -} from '../queries/task_claiming'; +import { TASK_MANAGER_MARK_AS_CLAIMED } from '../queries/task_claiming'; import { TaskClaim, asTaskClaimEvent, startTaskTimer } from '../task_events'; import { shouldBeOneOf, mustBeAllOf, filterDownBy, matchesClauses } from '../queries/query_clauses'; @@ -48,6 +44,7 @@ import { import { TaskStore, SearchOpts } from '../task_store'; import { isOk, asOk } from '../lib/result_type'; +import { selectTasksByCapacity } from './lib/task_selector_by_capacity'; import { TaskPartitioner } from '../lib/task_partitioner'; interface OwnershipClaimingOpts { @@ -55,7 +52,8 @@ interface OwnershipClaimingOpts { size: number; taskTypes: Set; removedTypes: Set; - excludedTaskTypes: string[]; + getCapacity: (taskType?: string | undefined) => number; + excludedTaskTypePatterns: string[]; taskStore: TaskStore; events$: Subject; definitions: TaskTypeDictionary; @@ -113,11 +111,12 @@ async function claimAvailableTasks(opts: TaskClaimerOpts): Promise { - const searchedTypes = Array.from(taskTypes) - .concat(Array.from(removedTypes)) - .filter((type) => !isTaskTypeExcluded(excludedTaskTypes, type)); - const queryForScheduledTasks = mustBeAllOf( - // Task must be enabled - EnabledTask, - // a task type that's not excluded (may be removed or not) - OneOfTaskTypes('task.taskType', searchedTypes), - // Either a task with idle status and runAt <= now or - // status running or claiming with a retryAt <= now. - shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), - // must have a status that isn't 'unrecognized' - RecognizedTask - ); + const excludedTaskTypes = new Set(getExcludedTaskTypes(definitions, excludedTaskTypePatterns)); + const claimPartitions = buildClaimPartitions({ + types: taskTypes, + excludedTaskTypes, + removedTypes, + getCapacity, + definitions, + }); const partitions = await taskPartitioner.getPartitions(); const sort: NonNullable = getClaimSort(definitions); - const query = matchesClauses( - queryForScheduledTasks, - filterDownBy(InactiveTasks), - tasksWithPartitions(partitions) - ); + const searches: SearchOpts[] = []; + + // not handling removed types yet + + // add search for unlimited types + if (claimPartitions.unlimitedTypes.length > 0) { + const queryForUnlimitedTasks = mustBeAllOf( + // Task must be enabled + EnabledTask, + // a task type that's not excluded (may be removed or not) + OneOfTaskTypes('task.taskType', claimPartitions.unlimitedTypes), + // Either a task with idle status and runAt <= now or + // status running or claiming with a retryAt <= now. + shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), + // must have a status that isn't 'unrecognized' + RecognizedTask + ); + + const queryUnlimitedTasks = matchesClauses( + queryForUnlimitedTasks, + filterDownBy(InactiveTasks), + tasksWithPartitions(partitions) + ); + searches.push({ + query: queryUnlimitedTasks, + sort, // note: we could optimize this to not sort on priority, for this case + size, + seq_no_primary_term: true, + }); + } - return await taskStore.fetch( - { + // add searches for limited types + for (const [type, capacity] of claimPartitions.limitedTypes) { + const queryForLimitedTasks = mustBeAllOf( + // Task must be enabled + EnabledTask, + // Specific task type + OneOfTaskTypes('task.taskType', [type]), + // Either a task with idle status and runAt <= now or + // status running or claiming with a retryAt <= now. + shouldBeOneOf(IdleTaskWithExpiredRunAt, RunningOrClaimingTaskWithExpiredRetryAt), + // must have a status that isn't 'unrecognized' + RecognizedTask + ); + + const query = matchesClauses( + queryForLimitedTasks, + filterDownBy(InactiveTasks), + tasksWithPartitions(partitions) + ); + searches.push({ query, sort, - size, + size: capacity * SIZE_MULTIPLIER_FOR_TASK_FETCH, seq_no_primary_term: true, - }, - // limit the response size - true - ); + }); + } + + return await taskStore.msearch(searches); } -function applyLimitedConcurrency( - tasks: ConcreteTaskInstance[], - batches: TaskClaimingBatches -): ConcreteTaskInstance[] { - // create a map of task type - concurrency - const limitedBatches = batches.filter(isLimited); - const limitedMap = new Map(); - for (const limitedBatch of limitedBatches) { - const { tasksTypes, concurrency } = limitedBatch; - limitedMap.set(tasksTypes, concurrency); - } +interface ClaimPartitions { + removedTypes: string[]; + unlimitedTypes: string[]; + limitedTypes: Map; +} + +interface BuildClaimPartitionsOpts { + types: Set; + excludedTaskTypes: Set; + removedTypes: Set; + getCapacity: (taskType?: string) => number; + definitions: TaskTypeDictionary; +} + +function buildClaimPartitions(opts: BuildClaimPartitionsOpts): ClaimPartitions { + const result: ClaimPartitions = { + removedTypes: [], + unlimitedTypes: [], + limitedTypes: new Map(), + }; + + const { types, excludedTaskTypes, removedTypes, getCapacity, definitions } = opts; + for (const type of types) { + const definition = definitions.get(type); + if (definition == null) continue; + + if (excludedTaskTypes.has(type)) continue; + + if (removedTypes.has(type)) { + result.removedTypes.push(type); + continue; + } - // apply the limited concurrency - const result: ConcreteTaskInstance[] = []; - for (const task of tasks) { - const concurrency = limitedMap.get(task.taskType); - if (concurrency == null) { - result.push(task); + if (definition.maxConcurrency == null) { + result.unlimitedTypes.push(definition.type); continue; } - if (concurrency > 0) { - result.push(task); - limitedMap.set(task.taskType, concurrency - 1); + const capacity = getCapacity(definition.type) / definition.cost; + if (capacity !== 0) { + result.limitedTypes.set(definition.type, capacity); } } diff --git a/x-pack/plugins/task_manager/server/task_store.mock.ts b/x-pack/plugins/task_manager/server/task_store.mock.ts index c15518eaed510..7cf051f406532 100644 --- a/x-pack/plugins/task_manager/server/task_store.mock.ts +++ b/x-pack/plugins/task_manager/server/task_store.mock.ts @@ -33,6 +33,8 @@ export const taskStoreMock = { bulkGet: jest.fn(), bulkGetVersions: jest.fn(), getDocVersions: jest.fn(), + search: jest.fn(), + msearch: jest.fn(), index, taskManagerId, } as unknown as jest.Mocked; diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 80c1f46f53e4d..19f2861b0ed16 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -360,6 +360,141 @@ describe('TaskStore', () => { }); }); + describe('msearch', () => { + let store: TaskStore; + let esClient: ReturnType['asInternalUser']; + let childEsClient: ReturnType< + typeof elasticsearchServiceMock.createClusterClient + >['asInternalUser']; + + beforeAll(() => { + esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + childEsClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + esClient.child.mockReturnValue(childEsClient as unknown as Client); + store = new TaskStore({ + logger: mockLogger(), + index: 'tasky', + taskManagerId: '', + serializer, + esClient, + definitions: taskDefinitions, + savedObjectsRepository: savedObjectsClient, + adHocTaskCounter, + allowReadingInvalidState: false, + requestTimeouts: { + update_by_query: 1000, + }, + }); + }); + + async function testMsearch( + optsArray: SearchOpts[], + hitsArray: Array> = [] + ) { + childEsClient.msearch.mockResponse({ + took: 0, + responses: hitsArray.map((hits) => ({ + hits, + took: 0, + _shards: { + failed: 0, + successful: 1, + total: 1, + }, + timed_out: false, + status: 200, + })), + }); + + const result = await store.msearch(optsArray); + + expect(childEsClient.msearch).toHaveBeenCalledTimes(1); + + return { + result, + args: childEsClient.msearch.mock.calls[0][0], + }; + } + + test('empty call filters by type, sorts by runAt and id', async () => { + const { args } = await testMsearch([{}], []); + expect(args).toMatchObject({ + index: 'tasky', + body: [ + {}, + { + sort: [{ 'task.runAt': 'asc' }], + query: { term: { type: 'task' } }, + }, + ], + }); + }); + + test('allows multiple custom queries', async () => { + const { args } = await testMsearch( + [ + { + query: { + term: { 'task.taskType': 'foo' }, + }, + }, + { + query: { + term: { 'task.taskType': 'bar' }, + }, + }, + ], + [] + ); + + expect(args).toMatchObject({ + body: [ + {}, + { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'foo' } }], + }, + }, + }, + {}, + { + query: { + bool: { + must: [{ term: { type: 'task' } }, { term: { 'task.taskType': 'bar' } }], + }, + }, + }, + ], + }); + }); + + test('pushes error from call cluster to errors$', async () => { + const firstErrorPromise = store.errors$.pipe(first()).toPromise(); + childEsClient.msearch.mockResponse({ + took: 0, + responses: [ + { + took: 0, + _shards: { + failed: 0, + successful: 1, + total: 1, + }, + timed_out: false, + status: 429, + }, + ], + } as estypes.MsearchResponse); + await expect(store.msearch([{}])).rejects.toThrowErrorMatchingInlineSnapshot( + `"Unexpected status code from taskStore::msearch: 429"` + ); + expect(await firstErrorPromise).toMatchInlineSnapshot( + `[Error: Unexpected status code from taskStore::msearch: 429]` + ); + }); + }); + describe('aggregate', () => { let store: TaskStore; let esClient: ReturnType['asInternalUser']; diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 9b58d7bc3c18b..12a1f256c585b 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -41,6 +41,7 @@ import { import { TaskTypeDictionary } from './task_type_dictionary'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { TaskValidator } from './task_validator'; +import { claimSort } from './queries/mark_available_tasks_as_claimed'; import { MAX_PARTITIONS } from './lib/task_partitioner'; export interface StoreOpts { @@ -504,6 +505,41 @@ export class TaskStore { } } + // like search(), only runs multiple searches in parallel returning the combined results + async msearch(opts: SearchOpts[] = []): Promise { + const queries = opts.map(({ sort = [{ 'task.runAt': 'asc' }], ...opt }) => + ensureQueryOnlyReturnsTaskObjects({ sort, ...opt }) + ); + const body = queries.flatMap((query) => [{}, query]); + + const result = await this.esClientWithoutRetries.msearch({ + index: this.index, + ignore_unavailable: true, + body, + }); + const { responses } = result; + + const versionMap = this.createVersionMap([]); + let allTasks = new Array(); + + for (const response of responses) { + if (response.status !== 200) { + const err = new Error(`Unexpected status code from taskStore::msearch: ${response.status}`); + this.errors$.next(err); + throw err; + } + + const { hits } = response as estypes.MsearchMultiSearchItem; + const { hits: tasks } = hits; + this.addTasksToVersionMap(versionMap, tasks); + allTasks = allTasks.concat(this.filterTasks(tasks)); + } + + const allSortedTasks = claimSort(this.definitions, allTasks); + + return { docs: allSortedTasks, versionMap }; + } + private async search( opts: SearchOpts = {}, limitResponse: boolean = false @@ -522,27 +558,9 @@ export class TaskStore { hits: { hits: tasks }, } = result; - const versionMap = new Map(); - for (const task of tasks) { - if (task._seq_no == null || task._primary_term == null) continue; - - const esId = task._id!.startsWith('task:') ? task._id!.slice(5) : task._id!; - versionMap.set(esId, { - esId: task._id!, - seqNo: task._seq_no, - primaryTerm: task._primary_term, - }); - } - + const versionMap = this.createVersionMap(tasks); return { - docs: tasks - // @ts-expect-error @elastic/elasticsearch _source is optional - .filter((doc) => this.serializer.isRawSavedObject(doc)) - // @ts-expect-error @elastic/elasticsearch _source is optional - .map((doc) => this.serializer.rawToSavedObject(doc)) - .map((doc) => omit(doc, 'namespace') as SavedObject) - .map((doc) => savedObjectToConcreteTaskInstance(doc)) - .filter((doc): doc is ConcreteTaskInstance => !!doc), + docs: this.filterTasks(tasks), versionMap, }; } catch (e) { @@ -551,6 +569,45 @@ export class TaskStore { } } + private filterTasks( + tasks: Array> + ): ConcreteTaskInstance[] { + return ( + tasks + // @ts-expect-error @elastic/elasticsearch _source is optional + .filter((doc) => this.serializer.isRawSavedObject(doc)) + // @ts-expect-error @elastic/elasticsearch _source is optional + .map((doc) => this.serializer.rawToSavedObject(doc)) + .map((doc) => omit(doc, 'namespace') as SavedObject) + .map((doc) => savedObjectToConcreteTaskInstance(doc)) + .filter((doc): doc is ConcreteTaskInstance => !!doc) + ); + } + + private addTasksToVersionMap( + versionMap: Map, + tasks: Array> + ): void { + for (const task of tasks) { + if (task._id == null || task._seq_no == null || task._primary_term == null) continue; + + const esId = task._id.startsWith('task:') ? task._id.slice(5) : task._id; + versionMap.set(esId, { + esId: task._id, + seqNo: task._seq_no, + primaryTerm: task._primary_term, + }); + } + } + + private createVersionMap( + tasks: Array> + ): Map { + const versionMap = new Map(); + this.addTasksToVersionMap(versionMap, tasks); + return versionMap; + } + public async aggregate({ aggs, query,