-
Notifications
You must be signed in to change notification settings - Fork 8.5k
Core task manager #24356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Core task manager #24356
Changes from 30 commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
81c8561
Core task manager (#23555)
chrisdavies 0eeb368
spelling
tsullivan ad20601
readme fix
tsullivan f767ba7
Merge branch 'master' into feature-alerting/task-manager
tsullivan 9f2ffc9
fix test code impacted by hapi upgrade
tsullivan 756d189
Merge branch 'master' into alerting/task-manager
njd5475 bac2f04
Merge branch 'master' into feature-alerting/task-manager
tsullivan 19a3096
Task Manager index creation changes (#24542)
njd5475 98e396a
Improve taskRunner's processResult (#24880)
tsullivan ca9b663
Merge branch 'master' into feature-alerting/task-manager
tsullivan 86ca944
Merge branch 'master' into feature-alerting/task-manager
tsullivan fb2b7f1
Merge branch 'master' into feature-alerting/task-manager
tsullivan 49bde06
Merge branch 'master' into alerting/task-manager
njd5475 863f975
Merge branch 'master' into feature-alerting/task-manager
tsullivan f4645b8
Merge branch 'master' into alerting/task-manager
njd5475 a38ec14
Merge branch 'master' into alerting/task-manager
njd5475 b545a2c
Test alerting demo (#25136)
njd5475 21a9065
Merge branch 'master' into alerting/task-manager
njd5475 5b3cd9e
Explains how to add static task instances to task definitions. (#25880)
njd5475 9a55abd
Merge branch 'master' into alerting/task-manager
njd5475 4601f15
Tasks cannot be scheduled until task manager is initialized.
njd5475 e888c7a
Adds task manager api integration tests to ci group4.
njd5475 2380179
Context of describe test must be the test framework object.
njd5475 8b80e9f
Update src/es_archiver/lib/indices/kibana_index.js
Bamieh f43cec4
Merge branch 'master' into feature-alerting/task-manager
tsullivan 5e7761a
verify fillPool exception passing
tsullivan 224fa4f
readme update about max_workers/numWorkers
tsullivan a8f78fc
change forEach to reduce
tsullivan d5c5944
use public interfaces in internal method
tsullivan 72c5f98
replace getMaxAttempts with public readonly maxAttempts
tsullivan 3ce81fd
Update x-pack/plugins/task_manager/task_store.ts
pickypg 7c9ce65
min = 1 for max_workers
pickypg 0486d31
timeOut => timeout
tsullivan 1715242
min 1
tsullivan 447d484
scope as an array
tsullivan 22e7030
no retries
pickypg e4d96f0
ConcreteTaskInstance is a TaskInstance
tsullivan a8c5853
remove guard per joi logic
tsullivan 0321d04
Merge branch 'master' into feature-alerting/task-manager
tsullivan f847a72
Merge branch 'alerting/task-manager' of github.com:elastic/kibana int…
tsullivan 342b7fd
more return types for functions
tsullivan 741e3e0
more comments around incremental back-off
pickypg f05e59f
throw error instead of return undefined
tsullivan b2c1907
poll_interval min 1000
tsullivan 589da32
avoid handling err.stack directly
tsullivan 1ffdf38
break up processResult
tsullivan 99094dc
fix a few runtime issues
tsullivan 75d03e1
only fetch idle tasks
tsullivan 4ae4ac3
remove check for status idle
tsullivan 5693785
always return a state, and when there is an error return the state we…
tsullivan 4e535e4
check isStarted before attemptWork
pickypg 958928d
ts fix
tsullivan 42280d0
Merge branch 'master' into feature-alerting/task-manager
tsullivan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to Elasticsearch B.V. under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch B.V. licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| import { mergeAtType } from './reduce'; | ||
| import { alias, wrap, uniqueKeys } from './modify_reduce'; | ||
|
|
||
| // How plugins define tasks that the task manager can run. | ||
| export const taskDefinitions = wrap( | ||
| alias('taskDefinitions'), | ||
| uniqueKeys(), | ||
| mergeAtType, | ||
| ); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,306 @@ | ||
| # Kibana task manager | ||
|
|
||
| The task manager is a generic system for running background tasks. It supports: | ||
|
|
||
| - Single-run and recurring tasks | ||
| - Scheduling tasks to run after a specified datetime | ||
| - Basic retry logic | ||
| - Recovery of stalled tasks / timeouts | ||
| - Tracking task state across multiple runs | ||
| - Configuring the run-parameters for specific tasks | ||
| - Basic coordination to prevent the same task instance from running on more than one Kibana system at a time | ||
|
|
||
| ## Implementation details | ||
|
|
||
| At a high-level, the task manager works like this: | ||
|
|
||
| - Every `{poll_interval}` milliseconds, check the `{index}` for any tasks that need to be run: | ||
| - `runAt` is past | ||
| - `attempts` is less than the configured threshold | ||
| - Attempt to claim the task by using optimistic concurrency to set: | ||
| - status to `running` | ||
| - `runAt` to now + the timeout specified by the task | ||
| - Execute the task, if the previous claim succeeded | ||
| - If the task fails, increment the `attempts` count and reschedule it | ||
| - If the task succeeds: | ||
| - If it is recurring, store the result of the run, and reschedule | ||
| - If it is not recurring, remove it from the index | ||
|
|
||
| ## Pooling | ||
|
|
||
| Each task manager instance runs tasks in a pool which ensures that at most N tasks are run at a time, where N is configurable. This prevents the system from running too many tasks at once in resource constrained environments. In addition to this, each individual task type definition can have `numWorkers` specified, which tells the system how many workers are consumed by a single running instance of a task. This effectively limits how many tasks of a given type can be run at once. | ||
|
|
||
| For example, we may have a system with a `max_workers` of 10, but a super expensive task (such as `reporting`) which specifies a `numWorkers` of 10. In this case, `reporting` tasks will run one at a time. | ||
|
|
||
| If a task specifies a higher `numWorkers` than the system supports, the system's `max_workers` setting will be substituted for it. | ||
|
|
||
| ## Config options | ||
|
|
||
| The task_manager can be configured via `taskManager` config options (e.g. `taskManager.maxAttempts`): | ||
|
|
||
| - `max_attempts` - How many times a failing task instance will be retried before it is never run again | ||
| - `poll_interval` - How often the background worker should check the task_manager index for more work | ||
| - `index` - The name of the index that the task_manager | ||
| - `max_workers` - The maximum number of tasks a Kibana will run concurrently (defaults to 10) | ||
| - `credentials` - Encrypted user credentials. All tasks will run in the security context of this user. See [this issue](https://github.com/elastic/dev/issues/1045) for a discussion on task scheduler security. | ||
| - `override_num_workers`: An object of `taskType: number` that overrides the `num_workers` for tasks | ||
| - For example: `task_manager.override_num_workers.reporting: 2` would override the number of workers occupied by tasks of type `reporting` | ||
| - This allows sysadmins to tweak the operational performance of Kibana, allowing more or fewer tasks of a specific type to run simultaneously | ||
|
|
||
| ## Task definitions | ||
|
|
||
| Plugins define tasks by calling the `registerTaskDefinitions` method on the `server.taskManager` object. | ||
|
|
||
| A sample task can be found in the [x-pack/test/plugin_api_integration/plugins/task_manager](../../test/plugin_api_integration/plugins/task_manager/index.js) folder. | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove this part: this test config is going away |
||
|
|
||
| ```js | ||
| const { taskManager } = server; | ||
| taskManager.registerTaskDefinitions({ | ||
| // clusterMonitoring is the task type, and must be unique across the entire system | ||
| clusterMonitoring: { | ||
| // Human friendly name, used to represent this task in logs, UI, etc | ||
| title: 'Human friendly name', | ||
|
|
||
| // Optional, human-friendly, more detailed description | ||
| description: 'Amazing!!', | ||
|
|
||
| // Optional, how long, in minutes, the system should wait before | ||
| // a running instance of this task is considered to be timed out. | ||
| // This defaults to 5 minutes. | ||
| timeOut: '5m', | ||
|
|
||
| // The clusterMonitoring task occupies 2 workers, so if the system has 10 worker slots, | ||
| // 5 clusterMonitoring tasks could run concurrently per Kibana instance. This value is | ||
| // overridden by the `override_num_workers` config value, if specified. | ||
| numWorkers: 2, | ||
|
|
||
| // The createTaskRunner function / method returns an object that is responsible for | ||
| // performing the work of the task. context: { taskInstance, kbnServer }, is documented below. | ||
| createTaskRunner(context) { | ||
| return { | ||
| // Perform the work of the task. The return value should fit the TaskResult interface, documented | ||
| // below. Invalid return values will result in a logged warning. | ||
| async run() { | ||
| // Do some work | ||
| // Conditionally send some alerts | ||
| // Return some result or other... | ||
| }, | ||
|
|
||
| // Optional, will be called if a running instance of this task times out, allowing the task | ||
| // to attempt to clean itself up. | ||
| async cancel() { | ||
| // Do whatever is required to cancel this task, such as killing any spawned processes | ||
| }, | ||
| }; | ||
| }, | ||
| }, | ||
| }); | ||
| ``` | ||
|
|
||
| When Kibana attempts to claim and run a task instance, it looks its definition up, and executes its createTaskRunner's method, passing it a run context which looks like this: | ||
|
|
||
| ```js | ||
| { | ||
| // An instance of the Kibana server object. | ||
| kbnServer, | ||
|
|
||
| // The data associated with this instance of the task, with two properties being most notable: | ||
| // | ||
| // params: | ||
| // An object, specific to this task instance, used by the | ||
| // task to determine exactly what work should be performed. | ||
| // e.g. a cluster-monitoring task might have a `clusterName` | ||
| // property in here, but a movie-monitoring task might have | ||
| // a `directorName` property. | ||
| // | ||
| // state: | ||
| // The state returned from the previous run of this task instance. | ||
| // If this task instance has never succesfully run, this will | ||
| // be an empty object: {} | ||
| taskInstance, | ||
| } | ||
| ``` | ||
|
|
||
| ## Task result | ||
|
|
||
| The task runner's `run` method is expected to return a promise that resolves to undefined or to an object that looks like the following: | ||
| ```js | ||
| { | ||
| // Optional, if specified, this is used as the tasks' nextRun, overriding | ||
| // the default system scheduler. | ||
| runAt: "2020-07-24T17:34:35.272Z", | ||
|
|
||
| // Optional, an error object, logged out as a warning. The pressence of this | ||
| // property indicates that the task did not succeed. | ||
| error: { message: 'Hrumph!' }, | ||
|
|
||
| // Optional, this will be passed into the next run of the task, if | ||
| // this is a recurring task. | ||
| state: { | ||
| anything: 'goes here', | ||
| }, | ||
| } | ||
| ``` | ||
|
|
||
| Other return values will result in a warning, but the system should continue to work. | ||
|
|
||
| ## Task instances | ||
|
|
||
| The task_manager module will store scheduled task instances in an index. This allows for recovery of failed tasks, coordination across Kibana clusters, persistence across Kibana reboots, etc. | ||
|
|
||
| The data stored for a task instance looks something like this: | ||
|
|
||
| ```js | ||
| { | ||
| // The type of task that will run this instance. | ||
| taskType: 'clusterMonitoring', | ||
|
|
||
| // The next time this task instance should run. It is not guaranteed | ||
| // to run at this time, but it is guaranteed not to run earlier than | ||
| // this. | ||
| runAt: "2020-07-24T17:34:35.272Z", | ||
|
|
||
| // Indicates that this is a recurring task. We currently only support | ||
| // 1 minute granularity. | ||
| interval: '5m', | ||
|
|
||
| // How many times this task has been unsuccesfully attempted, | ||
| // this will be reset to 0 if the task ever succesfully completes. | ||
| // This is incremented if a task fails or times out. | ||
| attempts: 0, | ||
|
|
||
| // Currently, this is either idle | running. It is used to | ||
| // coordinate which Kibana instance owns / is running a specific | ||
| // task instance. | ||
| status: 'idle', | ||
|
|
||
| // The params specific to this task instance, which will be | ||
| // passed to the task when it runs, and will be used by the | ||
| // task to determine exactly what work should be performed. | ||
| // This is a JSON blob, and will be different per task type. | ||
| // e.g. a cluster-monitoring task might have a `clusterName` | ||
| // property in here, but a movie-monitoring task might have | ||
| // a `directorName` property. | ||
| params: '{ "task": "specific stuff here" }', | ||
|
|
||
| // The result of the previous run of this task instance. This | ||
| // will be passed to the next run of the task, along with the | ||
| // params, and could be used by a task to do special logic If | ||
| // the task state changes (e.g. from green to red, or foo to bar) | ||
| // If there was no previous run (e.g. the instance has never succesfully | ||
| // completed, this will be an empty object.). This is a JSON blob, | ||
| // and will be different per task type. | ||
| state: '{ "status": "green" }', | ||
|
|
||
| // An extension point for 3rd parties to build in security features on | ||
| // top of the task manager. For example, this might be the token of the user | ||
| // who scheduled this task. | ||
| userContext: 'the token of the user who scheduled this task', | ||
|
|
||
| // An extension point for 3rd parties to build in security features on | ||
| // top of the task manager, and is expected to be the id of the user, if any, | ||
| // that scheduled this task. | ||
| user: '23lk3l42', | ||
|
|
||
| // An application-specific designation, allowing different Kibana | ||
| // plugins / apps to query for only those tasks they care about. | ||
| scope: 'alerting', | ||
| } | ||
| ``` | ||
|
|
||
| ## Programmatic access | ||
|
|
||
| The task manager mixin exposes a taskManager object on the Kibana server which plugins can use to manage scheduled tasks. Each method takes an optional `scope` argument and ensures that only tasks with the specified scope(s) will be affected. | ||
|
|
||
| ```js | ||
| const { taskManager } = server; | ||
| // Schedules a task. All properties are as documented in the previous | ||
| // storage section, except that here, params is an object, not a JSON | ||
| // string. | ||
| const task = await taskManager.schedule({ | ||
| taskType, | ||
| runAt, | ||
| interval, | ||
| params, | ||
| scope: 'my-fanci-app', | ||
| }); | ||
|
|
||
| // Removes the specified task | ||
| await manager.remove(task.id); | ||
|
|
||
| // Fetches tasks, supports pagination, via the search-after API: | ||
| // https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-search-after.html | ||
| // If scope is not specified, all tasks are returned, otherwise only tasks | ||
| // with the given scope are returned. | ||
| const results = await manager.find({ scope: 'my-fanci-app', searchAfter: ['ids'] }); | ||
|
|
||
| // results look something like this: | ||
| { | ||
| searchAfter: ['233322'], | ||
| // Tasks is an array of task instances | ||
| tasks: [{ | ||
| id: '3242342', | ||
| taskType: 'reporting', | ||
| // etc | ||
| }] | ||
| } | ||
| ``` | ||
|
|
||
| More custom access to the tasks can be done directly via Elasticsearch, though that won't be officially supported, as we can change the document structure at any time. | ||
|
|
||
| ## Middleware | ||
|
|
||
| The task manager exposes a middleware layer that allows modifying tasks before they are scheduled / persisted to the task manager index, and modifying tasks / the run context before a task is run. | ||
|
|
||
| For example: | ||
|
|
||
| ```js | ||
| // In your plugin's init | ||
| server.taskManager.addMiddleware({ | ||
| async beforeSave({ taskInstance, ...opts }) { | ||
| console.log(`About to save a task of type ${taskInstance.taskType}`); | ||
|
|
||
| return { | ||
| ...opts, | ||
| taskInstance: { | ||
| ...taskInstance, | ||
| params: { | ||
| ...taskInstance.params, | ||
| example: 'Added to params!', | ||
| }, | ||
| }, | ||
| }; | ||
| }, | ||
|
|
||
| async beforeRun({ taskInstance, ...opts }) { | ||
| console.log(`About to run ${taskInstance.taskType} ${taskInstance.id}`); | ||
| const { example, ...taskWithoutExampleProp } = taskInstance; | ||
|
|
||
| return { | ||
| ...opts, | ||
| taskInstance: taskWithoutExampleProp, | ||
| }; | ||
| }, | ||
| }); | ||
| ``` | ||
|
|
||
| ## Limitations in v1.0 | ||
|
|
||
| In v1, the system only understands 1 minute increments (e.g. '1m', '7m'). Tasks which need something more robust will need to specify their own "runAt" in their run method's return value. | ||
|
|
||
| There is only a rudimentary mechanism for coordinating tasks and handling expired tasks. Tasks are considered expired if their runAt has arrived, and their status is still 'running'. | ||
|
|
||
| There is no task history. Each run overwrites the previous run's state. One-time tasks are removed from the index upon completion regardless of success / failure. | ||
|
|
||
| The task manager's public API is create / delete / list. Updates aren't directly supported, and listing should be scoped so that users only see their own tasks. | ||
|
|
||
| ## Testing | ||
|
|
||
| - `node scripts/jest --testPathPattern=task_manager --watch` | ||
|
|
||
| Integration tests can be run like so: | ||
|
|
||
| ``` | ||
| node scripts/functional_tests_server.js --config test/plugin_functional/config.js | ||
| node scripts/functional_test_runner --config test/plugin_functional/config.js --grep task_manager | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| ``` | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you mean
1here instead of10since the end result is reporting will run one at a time?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is correct as written. The takeaway is that Reporting can define a task type and specify
10as the numWorkers. Only a Kibana instance with 10 workers free can claim the task.Unless the instance has
max_workersof 20 in its configuration, the instance is capable of running only one of those types of tasks at a time.