diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.test.ts b/x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.test.ts index cae4112683181..43a863e2b4bb7 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.test.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_claimers/strategy_mget.test.ts @@ -52,6 +52,9 @@ jest.mock('../constants', () => ({ 'limitedToTwo', 'limitedToFive', 'yawn', + 'sampleTaskSharedConcurrencyType1', + 'sampleTaskSharedConcurrencyType2', + 'sampleTaskZeroMaxConcurrency', ], })); @@ -2063,6 +2066,50 @@ describe('TaskClaiming', () => { { tags: ['taskClaiming', 'claimAvailableTasksMget'] } ); }); + + test('should not claim the tasks that has 0 maxConcurrency (pollEnabled:false)', async () => { + const definitions = new TaskTypeDictionary(mockLogger()); + definitions.registerTaskDefinitions({ + foo: { + title: 'foo', + createTaskRunner: jest.fn(), + }, + bar: { + title: 'bar', + createTaskRunner: jest.fn(), + }, + baz: { + title: 'baz', + createTaskRunner: jest.fn(), + }, + sampleTaskZeroMaxConcurrency: { + title: 'report', + createTaskRunner: jest.fn(), + maxConcurrency: 0, + }, + }); + const { taskClaiming, store } = initialiseTestClaiming({ + storeOpts: { + definitions, + }, + taskClaimingOpts: { + maxAttempts: 5, + }, + }); + + await taskClaiming.claimAvailableTasksIfCapacityIsAvailable({ + claimOwnershipUntil: new Date(), + }); + + const searchQuery = store.msearch.mock.calls[0]?.[0]?.[0].query; + const searchQueryMust = searchQuery?.bool?.must; + + expect(Array.isArray(searchQueryMust) && searchQueryMust[1]).toEqual({ + bool: { must: [{ terms: { 'task.taskType': ['foo', 'bar', 'baz'] } }] }, + }); + + expect(JSON.stringify(searchQuery)).not.toContain('sampleTaskZeroMaxConcurrency'); + }); }); describe('task events', () => { diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.test.ts b/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.test.ts index 31fd90b8c6ec6..13b511c735a59 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.test.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.test.ts @@ -130,6 +130,26 @@ describe('CostCapacity', () => { ).toBe(4); }); + test('availableCapacity returns 0 for the task type when task type with maxConcurrency is 0', () => { + const pool = new CostCapacity({ capacity$: of(10), logger }); + + const tasksInPool = new Map([ + ['1', { ...mockTask({}, { type: 'type1' }) }], + ['2', { ...mockTask({}, { cost: TaskCost.Tiny }) }], + ['3', { ...mockTask() }], + ]); + + expect( + pool.availableCapacity(tasksInPool, { + type: 'type1', + maxConcurrency: 0, + cost: TaskCost.Normal, + createTaskRunner: jest.fn(), + timeout: '5m', + }) + ).toBe(0); + }); + describe('determineTasksToRunBasedOnCapacity', () => { test('runs all tasks if there is capacity', () => { const pool = new CostCapacity({ capacity$: of(10), logger }); diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.ts b/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.ts index 29e214b63bc41..3b87d7bfab77a 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_pool/cost_capacity.ts @@ -5,11 +5,12 @@ * 2.0. */ -import { Logger } from '@kbn/core/server'; +import type { Logger } from '@kbn/core/server'; +import { isNullish } from 'utility-types'; import { DEFAULT_CAPACITY } from '../config'; -import { TaskDefinition } from '../task'; -import { TaskRunner } from '../task_running'; -import { CapacityOpts, ICapacity } from './types'; +import type { TaskDefinition } from '../task'; +import type { TaskRunner } from '../task_running'; +import type { CapacityOpts, ICapacity } from './types'; import { getCapacityInCost } from './utils'; export class CostCapacity implements ICapacity { @@ -69,7 +70,7 @@ export class CostCapacity implements ICapacity { taskDefinition?: TaskDefinition | null ): number { const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool); - if (taskDefinition && taskDefinition.maxConcurrency) { + if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) { // calculate the max capacity that can be used for this task type based on cost const maxCapacityForType = taskDefinition.maxConcurrency * taskDefinition.cost; return Math.max( diff --git a/x-pack/platform/plugins/shared/task_manager/server/task_pool/worker_capacity.ts b/x-pack/platform/plugins/shared/task_manager/server/task_pool/worker_capacity.ts index 8363c53f58ec1..dd9b56b5a4b8d 100644 --- a/x-pack/platform/plugins/shared/task_manager/server/task_pool/worker_capacity.ts +++ b/x-pack/platform/plugins/shared/task_manager/server/task_pool/worker_capacity.ts @@ -5,10 +5,11 @@ * 2.0. */ -import { Logger } from '@kbn/core/server'; -import { TaskRunner } from '../task_running'; -import { CapacityOpts, ICapacity } from './types'; -import { TaskDefinition } from '../task'; +import type { Logger } from '@kbn/core/server'; +import { isNullish } from 'utility-types'; +import type { TaskRunner } from '../task_running'; +import type { CapacityOpts, ICapacity } from './types'; +import type { TaskDefinition } from '../task'; import { getCapacityInWorkers } from './utils'; export class WorkerCapacity implements ICapacity { @@ -60,7 +61,7 @@ export class WorkerCapacity implements ICapacity { taskDefinition?: TaskDefinition | null ): number { const allAvailableCapacity = this.capacity - this.usedCapacity(tasksInPool); - if (taskDefinition && taskDefinition.maxConcurrency) { + if (taskDefinition && !isNullish(taskDefinition.maxConcurrency)) { // calculate the max workers that can be used for this task type return Math.max( Math.min(