Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ jest.mock('../constants', () => ({
'limitedToTwo',
'limitedToFive',
'yawn',
'sampleTaskSharedConcurrencyType1',
'sampleTaskSharedConcurrencyType2',
'sampleTaskZeroMaxConcurrency',
],
}));

Expand Down Expand Up @@ -2063,6 +2066,50 @@ describe('TaskClaiming', () => {
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
});

test('should not claim the tasks that has 0 maxConcurrency (pollEnabled:false)', async () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
createTaskRunner: jest.fn(),
},
baz: {
title: 'baz',
createTaskRunner: jest.fn(),
},
sampleTaskZeroMaxConcurrency: {
title: 'report',
createTaskRunner: jest.fn(),
maxConcurrency: 0,
},
});
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts: {
definitions,
},
taskClaimingOpts: {
maxAttempts: 5,
},
});

await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});

const searchQuery = store.msearch.mock.calls[0]?.[0]?.[0].query;
const searchQueryMust = searchQuery?.bool?.must;

expect(Array.isArray(searchQueryMust) && searchQueryMust[1]).toEqual({
bool: { must: [{ terms: { 'task.taskType': ['foo', 'bar', 'baz'] } }] },
});

expect(JSON.stringify(searchQuery)).not.toContain('sampleTaskZeroMaxConcurrency');
});
});

describe('task events', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,26 @@ describe('CostCapacity', () => {
).toBe(4);
});

test('availableCapacity returns 0 for the task type when task type with maxConcurrency is 0', () => {
const pool = new CostCapacity({ capacity$: of(10), logger });

const tasksInPool = new Map([
['1', { ...mockTask({}, { type: 'type1' }) }],
['2', { ...mockTask({}, { cost: TaskCost.Tiny }) }],
['3', { ...mockTask() }],
]);

expect(
pool.availableCapacity(tasksInPool, {
type: 'type1',
maxConcurrency: 0,
cost: TaskCost.Normal,
createTaskRunner: jest.fn(),
timeout: '5m',
})
).toBe(0);
});

describe('determineTasksToRunBasedOnCapacity', () => {
test('runs all tasks if there is capacity', () => {
const pool = new CostCapacity({ capacity$: of(10), logger });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
* 2.0.
*/

import { Logger } from '@kbn/core/server';
import type { Logger } from '@kbn/core/server';
import { isNullish } from 'utility-types';
import { DEFAULT_CAPACITY } from '../config';
import { TaskDefinition } from '../task';
import { TaskRunner } from '../task_running';
import { CapacityOpts, ICapacity } from './types';
import type { TaskDefinition } from '../task';
import type { TaskRunner } from '../task_running';
import type { CapacityOpts, ICapacity } from './types';
import { getCapacityInCost } from './utils';

export class CostCapacity implements ICapacity {
Expand Down Expand Up @@ -69,7 +70,7 @@ export class CostCapacity implements ICapacity {
taskDefinition?: TaskDefinition | null
): number {
const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool);
if (taskDefinition && taskDefinition.maxConcurrency) {
if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) {
// calculate the max capacity that can be used for this task type based on cost
const maxCapacityForType = taskDefinition.maxConcurrency * taskDefinition.cost;
return Math.max(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
* 2.0.
*/

import { Logger } from '@kbn/core/server';
import { TaskRunner } from '../task_running';
import { CapacityOpts, ICapacity } from './types';
import { TaskDefinition } from '../task';
import type { Logger } from '@kbn/core/server';
import { isNullish } from 'utility-types';
import type { TaskRunner } from '../task_running';
import type { CapacityOpts, ICapacity } from './types';
import type { TaskDefinition } from '../task';
import { getCapacityInWorkers } from './utils';

export class WorkerCapacity implements ICapacity {
Expand Down Expand Up @@ -60,7 +61,7 @@ export class WorkerCapacity implements ICapacity {
taskDefinition?: TaskDefinition | null
): number {
const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool);
if (taskDefinition && taskDefinition.maxConcurrency) {
if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) {
// calculate the max workers that can be used for this task type
return Math.max(
Math.min(
Expand Down