Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ describe('EphemeralTaskLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down Expand Up @@ -205,7 +205,7 @@ describe('managed configuration', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

import { v4 as uuidV4 } from 'uuid';
import { ElasticsearchClient } from '@kbn/core/server';
import { TaskManagerPlugin, TaskManagerStartContract } from '../plugin';
import { injectTask, retry, setupTestServers } from './lib';
import { TestElasticsearchUtils, TestKibanaUtils } from '@kbn/core-test-helpers-kbn-server';
import { ConcreteTaskInstance, TaskStatus } from '../task';
import { CreateWorkloadAggregatorOpts } from '../monitoring/workload_statistics';

const taskManagerStartSpy = jest.spyOn(TaskManagerPlugin.prototype, 'start');

const { createWorkloadAggregator: createWorkloadAggregatorMock } = jest.requireMock(
'../monitoring/workload_statistics'
);
jest.mock('../monitoring/workload_statistics', () => {
const actual = jest.requireActual('../monitoring/workload_statistics');
return {
...actual,
createWorkloadAggregator: jest.fn().mockImplementation((opts) => {
return new actual.createWorkloadAggregator(opts);
}),
};
});

describe('unrecognized task types', () => {
let esServer: TestElasticsearchUtils;
let kibanaServer: TestKibanaUtils;
let taskManagerPlugin: TaskManagerStartContract;
let createWorkloadAggregatorOpts: CreateWorkloadAggregatorOpts;

const taskIdsToRemove: string[] = [];

beforeAll(async () => {
const setupResult = await setupTestServers({
xpack: {
task_manager: {
monitored_aggregated_stats_refresh_rate: 5000,
},
},
});
esServer = setupResult.esServer;
kibanaServer = setupResult.kibanaServer;

expect(taskManagerStartSpy).toHaveBeenCalledTimes(1);
taskManagerPlugin = taskManagerStartSpy.mock.results[0].value;

expect(createWorkloadAggregatorMock).toHaveBeenCalledTimes(1);
createWorkloadAggregatorOpts = createWorkloadAggregatorMock.mock.calls[0][0];
});

afterAll(async () => {
if (kibanaServer) {
await kibanaServer.stop();
}
if (esServer) {
await esServer.stop();
}
});

beforeEach(async () => {
jest.clearAllMocks();
});

afterEach(async () => {
while (taskIdsToRemove.length > 0) {
const id = taskIdsToRemove.pop();
await taskManagerPlugin.removeIfExists(id!);
}
});

test('should be no workload aggregator errors when there are removed task types', async () => {
const errorLogSpy = jest.spyOn(createWorkloadAggregatorOpts.logger, 'error');
const removeTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: removeTypeId,
taskType: 'sampleTaskRemovedType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});
const notRegisteredTypeId = uuidV4();
await injectTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser, {
id: notRegisteredTypeId,
taskType: 'sampleTaskNotRegisteredType',
params: {},
state: { foo: 'test' },
stateVersion: 1,
runAt: new Date(),
enabled: true,
scheduledAt: new Date(),
attempts: 0,
status: TaskStatus.Idle,
startedAt: null,
retryAt: null,
ownerId: null,
});

taskIdsToRemove.push(removeTypeId);
taskIdsToRemove.push(notRegisteredTypeId);

await retry(async () => {
const task = await getTask(kibanaServer.coreStart.elasticsearch.client.asInternalUser);
expect(task?._source?.task?.status).toBe('unrecognized');
});

// monitored_aggregated_stats_refresh_rate is set to the minimum of 5 seconds
// so we want to wait that long to let it refresh
await new Promise((r) => setTimeout(r, 5100));

expect(errorLogSpy).not.toHaveBeenCalled();
});
});

async function getTask(esClient: ElasticsearchClient) {
const response = await esClient.search<{ task: ConcreteTaskInstance }>({
index: '.kibana_task_manager',
body: {
query: {
bool: {
filter: [
{
term: {
'task.taskType': 'sampleTaskRemovedType',
},
},
],
},
},
},
});

return response.hits.hits[0];
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const config = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -78,7 +78,7 @@ const getStatsWithTimestamp = ({
timestamp,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_running_average_window: 50,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const config: TaskManagerConfig = {
},
version_conflict_threshold: 80,
worker_utilization_running_average_window: 5,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ function mockStats(
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 0,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('Configuration Statistics Aggregator', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand All @@ -75,7 +75,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 10,
as_cost: 20,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -94,7 +94,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 6000000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand All @@ -113,7 +113,7 @@ describe('Configuration Statistics Aggregator', () => {
as_workers: 8,
as_cost: 16,
},
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type ScheduledIntervals = ScheduleDensityResult['histogram']['buckets'][0];
// Set an upper bound just in case a customer sets a really high refresh rate
const MAX_SCHEDULE_DENSITY_BUCKETS = 50;

interface CreateWorkloadAggregatorOpts {
export interface CreateWorkloadAggregatorOpts {
taskStore: TaskStore;
elasticsearchAndSOAvailability$: Observable<boolean>;
refreshInterval: number;
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ const pluginInitializerContextParams = {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ describe('TaskPollingLifecycle', () => {
},
worker_utilization_running_average_window: 5,
metrics_reset_interval: 3000,
claim_strategy: 'default',
claim_strategy: 'update_by_query',
request_timeouts: {
update_by_query: 1000,
},
Expand Down
2 changes: 1 addition & 1 deletion x-pack/plugins/task_manager/server/routes/health.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ function mockHealthStats(overrides = {}) {
timestamp: new Date().toISOString(),
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ function getMockMonitoredHealth(overrides = {}): MonitoredHealth {
status: HealthStatus.OK,
value: {
capacity: { config: 10, as_cost: 20, as_workers: 10 },
claim_strategy: 'default',
claim_strategy: 'update_by_query',
poll_interval: 3000,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
Expand Down