Skip to content

Commit 89250be

Browse files
authored
[Task Manager] Tests for the ability to run tasks of varying durations in parallel (#51572) (#51701)
This PR adds a test that ensures Task Manager is capable of picking up new tasks in parallel to a long running tasks that might otherwise hold up task execution. This doesn't add functionality - just a missing test case.
1 parent ef43d3d commit 89250be

File tree

3 files changed

+125
-16
lines changed

3 files changed

+125
-16
lines changed

x-pack/test/plugin_api_integration/plugins/task_manager/index.js

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,20 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7+
const { EventEmitter } = require('events');
8+
79
import { initRoutes } from './init_routes';
810

11+
12+
const once = function (emitter, event) {
13+
return new Promise(resolve => {
14+
emitter.once(event, resolve);
15+
});
16+
};
17+
918
export default function TaskTestingAPI(kibana) {
19+
const taskTestingEvents = new EventEmitter();
20+
1021
return new kibana.Plugin({
1122
name: 'sampleTask',
1223
require: ['elasticsearch', 'task_manager'],
@@ -52,6 +63,10 @@ export default function TaskTestingAPI(kibana) {
5263
refresh: true,
5364
});
5465

66+
if (params.waitForEvent) {
67+
await once(taskTestingEvents, params.waitForEvent);
68+
}
69+
5570
return {
5671
state: { count: (prevState.count || 0) + 1 },
5772
runAt: millisecondsFromNow(params.nextRunMilliseconds),
@@ -88,7 +103,7 @@ export default function TaskTestingAPI(kibana) {
88103
},
89104
});
90105

91-
initRoutes(server);
106+
initRoutes(server, taskTestingEvents);
92107
},
93108
});
94109
}

x-pack/test/plugin_api_integration/plugins/task_manager/init_routes.js

Lines changed: 59 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,44 @@ const taskManagerQuery = {
2323
}
2424
};
2525

26-
export function initRoutes(server) {
26+
export function initRoutes(server, taskTestingEvents) {
2727
const taskManager = server.plugins.task_manager;
2828

2929
server.route({
30-
path: '/api/sample_tasks',
30+
path: '/api/sample_tasks/schedule',
31+
method: 'POST',
32+
config: {
33+
validate: {
34+
payload: Joi.object({
35+
task: Joi.object({
36+
taskType: Joi.string().required(),
37+
interval: Joi.string().optional(),
38+
params: Joi.object().required(),
39+
state: Joi.object().optional(),
40+
id: Joi.string().optional()
41+
})
42+
}),
43+
},
44+
},
45+
async handler(request) {
46+
try {
47+
const { task: taskFields } = request.payload;
48+
const task = {
49+
...taskFields,
50+
scope: [scope],
51+
};
52+
53+
const taskResult = await (taskManager.schedule(task, { request }));
54+
55+
return taskResult;
56+
} catch (err) {
57+
return err;
58+
}
59+
},
60+
});
61+
62+
server.route({
63+
path: '/api/sample_tasks/ensure_scheduled',
3164
method: 'POST',
3265
config: {
3366
validate: {
@@ -38,26 +71,19 @@ export function initRoutes(server) {
3871
params: Joi.object().required(),
3972
state: Joi.object().optional(),
4073
id: Joi.string().optional()
41-
}),
42-
ensureScheduled: Joi.boolean()
43-
.default(false)
44-
.optional(),
74+
})
4575
}),
4676
},
4777
},
4878
async handler(request) {
4979
try {
50-
const { ensureScheduled = false, task: taskFields } = request.payload;
80+
const { task: taskFields } = request.payload;
5181
const task = {
5282
...taskFields,
5383
scope: [scope],
5484
};
5585

56-
const taskResult = await (
57-
ensureScheduled
58-
? taskManager.ensureScheduled(task, { request })
59-
: taskManager.schedule(task, { request })
60-
);
86+
const taskResult = await (taskManager.ensureScheduled(task, { request }));
6187

6288
return taskResult;
6389
} catch (err) {
@@ -66,6 +92,27 @@ export function initRoutes(server) {
6692
},
6793
});
6894

95+
server.route({
96+
path: '/api/sample_tasks/event',
97+
method: 'POST',
98+
config: {
99+
validate: {
100+
payload: Joi.object({
101+
event: Joi.string().required()
102+
}),
103+
},
104+
},
105+
async handler(request) {
106+
try {
107+
const { event } = request.payload;
108+
taskTestingEvents.emit(event);
109+
return { event };
110+
} catch (err) {
111+
return err;
112+
}
113+
},
114+
});
115+
69116
server.route({
70117
path: '/api/sample_tasks',
71118
method: 'GET',

x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,28 @@ export default function ({ getService }) {
5858
}
5959

6060
function scheduleTask(task) {
61-
return supertest.post('/api/sample_tasks')
61+
return supertest.post('/api/sample_tasks/schedule')
6262
.set('kbn-xsrf', 'xxx')
6363
.send({ task })
6464
.expect(200)
6565
.then((response) => response.body);
6666
}
6767

6868
function scheduleTaskIfNotExists(task) {
69-
return supertest.post('/api/sample_tasks')
69+
return supertest.post('/api/sample_tasks/ensure_scheduled')
7070
.set('kbn-xsrf', 'xxx')
71-
.send({ task, ensureScheduled: true })
71+
.send({ task })
7272
.expect(200)
7373
.then((response) => response.body);
7474
}
7575

76+
function releaseTasksWaitingForEventToComplete(event) {
77+
return supertest.post('/api/sample_tasks/event')
78+
.set('kbn-xsrf', 'xxx')
79+
.send({ event })
80+
.expect(200);
81+
}
82+
7683
it('should support middleware', async () => {
7784
const historyItem = _.random(1, 100);
7885

@@ -204,5 +211,45 @@ export default function ({ getService }) {
204211
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.greaterThan(expectedDiff - buffer);
205212
expect(Date.parse(currentTask.runAt) - originalRunAt).to.be.lessThan(expectedDiff + buffer);
206213
}
214+
215+
it('should run tasks in parallel, allowing for long running tasks along side faster tasks', async () => {
216+
/**
217+
* It's worth noting this test relies on the /event endpoint that forces Task Manager to hold off
218+
* on completing a task until a call is made by the test suite.
219+
* If we begin testing with multiple Kibana instacnes in Parallel this will likely become flaky.
220+
* If you end up here because the test is flaky, this might be why.
221+
*/
222+
const fastTask = await scheduleTask({
223+
taskType: 'sampleTask',
224+
interval: `1s`,
225+
params: { },
226+
});
227+
228+
const longRunningTask = await scheduleTask({
229+
taskType: 'sampleTask',
230+
interval: `1s`,
231+
params: {
232+
waitForEvent: 'rescheduleHasHappened'
233+
},
234+
});
235+
236+
function getTaskById(tasks, id) {
237+
return tasks.filter(task => task.id === id)[0];
238+
}
239+
240+
await retry.try(async () => {
241+
const tasks = (await currentTasks()).docs;
242+
expect(getTaskById(tasks, fastTask.id).state.count).to.eql(2);
243+
});
244+
245+
await releaseTasksWaitingForEventToComplete('rescheduleHasHappened');
246+
247+
await retry.try(async () => {
248+
const tasks = (await currentTasks()).docs;
249+
250+
expect(getTaskById(tasks, fastTask.id).state.count).to.greaterThan(2);
251+
expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1);
252+
});
253+
});
207254
});
208255
}

0 commit comments

Comments
 (0)