Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jest.mock('../constants', () => ({
'yawn',
'sampleTaskSharedConcurrencyType1',
'sampleTaskSharedConcurrencyType2',
'sampleTaskZeroMaxConcurrency',
],
}));

Expand Down Expand Up @@ -2347,6 +2348,50 @@ describe('TaskClaiming', () => {
{ tags: ['taskClaiming', 'claimAvailableTasksMget'] }
);
});

test('should not claim the tasks that has 0 maxConcurrency (pollEnabled:false)', async () => {
const definitions = new TaskTypeDictionary(mockLogger());
definitions.registerTaskDefinitions({
foo: {
title: 'foo',
createTaskRunner: jest.fn(),
},
bar: {
title: 'bar',
createTaskRunner: jest.fn(),
},
baz: {
title: 'baz',
createTaskRunner: jest.fn(),
},
sampleTaskZeroMaxConcurrency: {
title: 'report',
createTaskRunner: jest.fn(),
maxConcurrency: 0,
},
});
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts: {
definitions,
},
taskClaimingOpts: {
maxAttempts: 5,
},
});

await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({
claimOwnershipUntil: new Date(),
});

const searchQuery = store.msearch.mock.calls[0]?.[0]?.[0].query;
const searchQueryMust = searchQuery?.bool?.must;

expect(Array.isArray(searchQueryMust) && searchQueryMust[1]).toEqual({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe another expect clause that the query does not contain sampleTaskZeroMaxConcurrency?

bool: { must: [{ terms: { 'task.taskType': ['foo', 'bar', 'baz'] } }] },
});

expect(JSON.stringify(searchQuery)).not.toContain('sampleTaskZeroMaxConcurrency');
});
});

describe('task events', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,26 @@ describe('CostCapacity', () => {
).toBe(4);
});

test('availableCapacity returns 0 for the task type when task type with maxConcurrency is 0', () => {
const pool = new CostCapacity({ capacity$: of(10), logger });

const tasksInPool = new Map([
['1', { ...mockTask({}, { type: 'type1' }) }],
['2', { ...mockTask({}, { cost: TaskCost.Tiny }) }],
['3', { ...mockTask() }],
]);

expect(
pool.availableCapacity(tasksInPool, {
type: 'type1',
maxConcurrency: 0,
cost: TaskCost.Normal,
createTaskRunner: jest.fn(),
timeout: '5m',
})
).toBe(0);
});

describe('determineTasksToRunBasedOnCapacity', () => {
test('runs all tasks if there is capacity', () => {
const pool = new CostCapacity({ capacity$: of(10), logger });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { Logger } from '@kbn/core/server';
import { isNullish } from 'utility-types';
import { DEFAULT_CAPACITY } from '../config';
import type { TaskDefinition } from '../task';
import type { TaskRunner } from '../task_running';
Expand Down Expand Up @@ -69,7 +70,7 @@ export class CostCapacity implements ICapacity {
taskDefinition?: TaskDefinition | null
): number {
const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool);
if (taskDefinition && taskDefinition.maxConcurrency) {
if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could probably add a unit test for when maxConcurrency is 0 as well

// calculate the max capacity that can be used for this task type based on cost
const maxCapacityForType = taskDefinition.maxConcurrency * taskDefinition.cost;
return Math.max(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import type { Logger } from '@kbn/core/server';
import { isNullish } from 'utility-types';
import type { TaskRunner } from '../task_running';
import type { CapacityOpts, ICapacity } from './types';
import type { TaskDefinition } from '../task';
Expand Down Expand Up @@ -60,7 +61,7 @@ export class WorkerCapacity implements ICapacity {
taskDefinition?: TaskDefinition | null
): number {
const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool);
if (taskDefinition && taskDefinition.maxConcurrency) {
if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) {
// calculate the max workers that can be used for this task type
return Math.max(
Math.min(
Expand Down