Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import {
RecognizedTask,
OneOfTaskTypes,
tasksWithPartitions,
claimSort,
} from './mark_available_tasks_as_claimed';

import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task';

import { TaskTypeDictionary } from '../task_type_dictionary';
import { mockLogger } from '../test_utils';

Expand Down Expand Up @@ -304,4 +307,129 @@ if (doc['task.runAt'].size()!=0) {
}
`);
});

// Tests sorting 3 tasks with different priorities, runAt/retryAt values
// running the sort over all permutations of them.
describe('claimSort', () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
normalPriorityTask: {
title: 'normal priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: TaskPriority.Normal, // 50
},
noPriorityTask: {
title: 'no priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: undefined, // 50
},
lowPriorityTask: {
title: 'low priority',
createTaskRunner: () => ({ run: () => Promise.resolve() }),
priority: TaskPriority.Low, // 1
},
});

// possible ordering of tasks before sort
const permutations = [
[0, 1, 2],
[0, 2, 1],
[1, 0, 2],
[1, 2, 0],
[2, 0, 1],
[2, 1, 0],
];

test('works correctly with same dates, different priorities', () => {
const date = new Date();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(buildTaskInstance({ taskType: 'lowPriorityTask', runAt: date }));
baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: date }));
baseTasks.push(buildTaskInstance({ taskType: 'normalPriorityTask', runAt: date }));

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
// all we know is low should be last
expect(sorted[2]).toBe(baseTasks[0]);
}
});

test('works correctly with same priorities, different dates', () => {
const baseDate = new Date('2024-07-29T00:00:00Z').valueOf();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) })
);
baseTasks.push(buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate) }));
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) })
);

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
expect(sorted[0]).toBe(baseTasks[2]);
expect(sorted[1]).toBe(baseTasks[1]);
expect(sorted[2]).toBe(baseTasks[0]);
}
});

test('works correctly with mixed of runAt and retryAt values', () => {
const baseDate = new Date('2024-07-29T00:00:00Z').valueOf();
const baseTasks: ConcreteTaskInstance[] = [];

// push in reverse order
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate + 1000) })
);
baseTasks.push(
buildTaskInstance({
taskType: 'noPriorityTask',
runAt: new Date(baseDate - 2000),
retryAt: new Date(baseDate), // should use this value
})
);
baseTasks.push(
buildTaskInstance({ taskType: 'noPriorityTask', runAt: new Date(baseDate - 1000) })
);

for (const perm of permutations) {
const tasks = [baseTasks[perm[0]], baseTasks[perm[1]], baseTasks[perm[2]]];
const sorted = claimSort(definitions, tasks);
expect(sorted[0]).toBe(baseTasks[2]);
expect(sorted[1]).toBe(baseTasks[1]);
expect(sorted[2]).toBe(baseTasks[0]);
}
});
});
});

interface BuildTaskOpts {
taskType: string;
runAt: Date;
retryAt?: Date;
}

let id = 1;

function buildTaskInstance(opts: BuildTaskOpts): ConcreteTaskInstance {
const { taskType, runAt, retryAt } = opts;
return {
taskType,
id: `${id++}`,
runAt,
retryAt: retryAt || null,
scheduledAt: runAt,
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
state: {},
params: {},
ownerId: null,
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/
import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { TaskTypeDictionary } from '../task_type_dictionary';
import { TaskStatus, TaskPriority } from '../task';
import { TaskStatus, TaskPriority, ConcreteTaskInstance } from '../task';
import {
ScriptBasedSortClause,
ScriptClause,
Expand All @@ -15,23 +15,6 @@ import {
MustNotCondition,
} from './query_clauses';

export function taskWithLessThanMaxAttempts(type: string, maxAttempts: number): MustCondition {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed a few lingering references to search-related things regarding tasks running too many attempts. I believe this got resolved in #152841; though not sure if that applies to recurring tasks. @mikecote @ymao1 ??? In any case, this function was no longer being used, so figured I might as well delete it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I don't think we enforced anything with max attempts for recurring task types.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 shouldn't be used for recurring tasks, only ad-hoc (one time) tasks

return {
bool: {
must: [
{ term: { 'task.taskType': type } },
{
range: {
'task.attempts': {
lt: maxAttempts,
},
},
},
],
},
};
}

export function tasksOfType(taskTypes: string[]): estypes.QueryDslQueryContainer {
return {
bool: {
Expand Down Expand Up @@ -166,12 +149,53 @@ function getSortByPriority(definitions: TaskTypeDictionary): estypes.SortCombina
};
}

// getClaimSort() is used to generate sort bits for the ES query
// should align with claimSort() below
export function getClaimSort(definitions: TaskTypeDictionary): estypes.SortCombinations[] {
const sortByPriority = getSortByPriority(definitions);
if (!sortByPriority) return [SortByRunAtAndRetryAt];
return [sortByPriority, SortByRunAtAndRetryAt];
}

// claimSort() is used to sort tasks returned from a claimer by priority and date.
// Kept here so it should align with getClaimSort() above.
// Returns a copy of the tasks passed in.
export function claimSort(
definitions: TaskTypeDictionary,
tasks: ConcreteTaskInstance[]
): ConcreteTaskInstance[] {
const priorityMap: Record<string, TaskPriority> = {};
tasks.forEach((task) => {
const taskType = task.taskType;
const priority = getPriority(definitions, taskType);
priorityMap[taskType] = priority;
});

return tasks.slice().sort(compare);

function compare(a: ConcreteTaskInstance, b: ConcreteTaskInstance) {
// sort by priority, descending
const priorityA = priorityMap[a.taskType] ?? TaskPriority.Normal;
const priorityB = priorityMap[b.taskType] ?? TaskPriority.Normal;

if (priorityA > priorityB) return -1;
if (priorityA < priorityB) return 1;

// then sort by retry/runAt, ascending
const runA = a.retryAt?.valueOf() ?? a.runAt.valueOf() ?? 0;
const runB = b.retryAt?.valueOf() ?? b.runAt.valueOf() ?? 0;

if (runA < runB) return -1;
if (runA > runB) return 1;

return 0;
}
}

function getPriority(definitions: TaskTypeDictionary, taskType: string): TaskPriority {
return definitions.get(taskType)?.priority ?? TaskPriority.Normal;
}

export interface UpdateFieldsAndMarkAsFailedOpts {
fieldUpdates: {
[field: string]: string | number | Date;
Expand Down
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/task_claimers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,12 @@ export function isTaskTypeExcluded(excludedTaskTypePatterns: string[], taskType:

return false;
}

export function getExcludedTaskTypes(
definitions: TaskTypeDictionary,
excludedTaskTypePatterns: string[]
) {
return definitions
.getAllTypes()
.filter((taskType) => isTaskTypeExcluded(excludedTaskTypePatterns, taskType));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { ConcreteTaskInstance } from '../../task';
import { isLimited, TaskClaimingBatches } from '../../queries/task_claiming';

// given a list of tasks and capacity info, select the tasks that meet capacity
export function selectTasksByCapacity(
tasks: ConcreteTaskInstance[],
batches: TaskClaimingBatches
): ConcreteTaskInstance[] {
// create a map of task type - concurrency
const limitedBatches = batches.filter(isLimited);
const limitedMap = new Map<string, number>();
for (const limitedBatch of limitedBatches) {
const { tasksTypes, concurrency } = limitedBatch;
limitedMap.set(tasksTypes, concurrency);
}

// apply the limited concurrency
const result: ConcreteTaskInstance[] = [];
for (const task of tasks) {
const concurrency = limitedMap.get(task.taskType);
if (concurrency == null) {
result.push(task);
continue;
}

if (concurrency > 0) {
result.push(task);
limitedMap.set(task.taskType, concurrency - 1);
}
}

return result;
}
Loading