-
Notifications
You must be signed in to change notification settings - Fork 8.5k
[Fleet] moving action batching to async #138870
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fleet] moving action batching to async #138870
Conversation
53d33e9 to
8a31e6c
Compare
| skipSuccess | ||
| ) | ||
| ); | ||
| const batchSize = options.batchSize ?? SO_SEARCH_LIMIT; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for testing locally with less than 10k agents enrolled, set a lower number here:
const batchSize = 1000; // options.batchSize ?? SO_SEARCH_LIMIT;
or invoke from API:
POST kbn:/api/fleet/agents/bulk_reassign
{
"agents": " fleet-agents.policy_id : (\"e57948b0-1d55-11ed-85fe-e34c31cf865b\" or \"26594150-1d56-11ed-85fe-e34c31cf865b\")",
"policy_id": "e57948b0-1d55-11ed-85fe-e34c31cf865b",
"batchSize": 1000
}
| start_time: startTime ?? now, | ||
| minimum_execution_duration: MINIMUM_EXECUTION_DURATION_SECONDS, | ||
| expiration: moment(startTime) | ||
| .add(MINIMUM_EXECUTION_DURATION_SECONDS, 'seconds') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default expiration with upgrade Immediately is 30m, this is too short for larger agent selections e.g. 20k. When I tried upgrading immediately, the action still completed for all actions despite expiring about halfway through. I am not sure if this is the expected behavior that Fleet Server continues to execute the action even after expiration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we could probably have a larger expiration here and minimum expiration duration (this need to be the same) for immediate to happen,
The default expiration with upgrade Immediately is 30m, this is too short for larger agent selections e.g. 20k. When I tried upgrading immediately, the action still completed for all actions despite expiring about halfway through. I am not sure if this is the expected behavior that Fleet Server continues to execute the action even after expiration.
The action should be delivered immediatly to the agent, but Fleet server may have some trouble to get all the agent ack in 30 minutes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any disadvantage of increasing the minimum expiration time to e.g. 2 hours? updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think increasing the minimum expiration time to 2 hours will mean that every rolling upgrade less than 2hours will not be a rolling upgrade, it's maybe acceptable, or maybe we can just have this to two hours or more for the immediate scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic of the rolling upgrade was to set minimum duration to 30m and expiration to the selected duration.
I updated the minimum to be 2hr (or 1hr if that was selected as the window)
Previously:
// Perform a rolling upgrade
if (upgradeDurationSeconds) {
return {
start_time: startTime ?? now,
minimum_execution_duration: MINIMUM_EXECUTION_DURATION_SECONDS,
expiration: moment(startTime ?? now)
.add(upgradeDurationSeconds, 'seconds')
.toISOString(),
};
}
Updated:
// Perform a rolling upgrade
if (upgradeDurationSeconds) {
return {
start_time: startTime ?? now,
minimum_execution_duration: Math.min(
MINIMUM_EXECUTION_DURATION_SECONDS,
upgradeDurationSeconds
),
expiration: moment(startTime ?? now)
.add(upgradeDurationSeconds, 'seconds')
.toISOString(),
};
}
|
Pinging @elastic/fleet (Team:Fleet) |
| start_time?: string; | ||
| minimum_execution_duration?: number; | ||
| source_uri?: string; | ||
| total?: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the total count that represents how many agents were actioned (clicked by user), this helps with status reporting in case something went wrong while creating the action documents in batches.
|
|
||
| export interface ActionStatus { | ||
| actionId: string; | ||
| nbAgentsActionCreated: number; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nbAgentsActionCreated represents how many agents are included in .fleet-actions documents, if this is less than nbAgentsActioned, it indicates something went wrong with kibana batch processing.
|
|
||
| await new Promise((resolve, reject) => { | ||
| let attempts = 0; | ||
| const intervalId = setInterval(async () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For testing the async execution of bulk actions, I had to wait for a Promise, otherwise the FTR shutdown starts and the action doesn't finish.
Added an interval of 1s for 3 times to check whether the action has completed.
kpollich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't gotten a chance to run through every action type locally, but I spent yesterday + this morning walking through the code and I think things look good to me. Definitely more OOP than we have elsewhere in the Fleet codebase, but I think the patterns used here with a base class implementing the bulk action processing needs all make sense in this context.
I'll offer my approval, but I think @nchaulet should probably continue to review here as well.
x-pack/plugins/fleet/public/applications/fleet/sections/agents/agent_list_page/index.tsx
Outdated
Show resolved
Hide resolved
| @@ -0,0 +1,143 @@ | |||
| /* | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this script to help with testing and local dev setup.
| }, | ||
| { | ||
| range: { | ||
| expiration: { gte: now }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think removing the expiration condition from this query makes sense.
| * On errors, starts a task with Task Manager to retry max 3 times | ||
| * If the last batch was stored in state, retry continues from there (searchAfter) | ||
| */ | ||
| public async runActionAsyncWithRetry(): Promise<{ items: BulkActionResult[]; actionId: string }> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That method seems a good candidate to be unit tested, and will help to document the expected behavior
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean unit tested?
Yes, I am planning to add more tests after I complete the UI part.
|
Maybe a dumb question, but I am wondering if we considered using the task scheduler for every bulkAction creation and not just for retry? it could make things easier to understand and probably more scalable if the task manager is hosted on a separate Kibana instance only running tasks in the future, I do not like having long running task from a request it seems something that can break to me. |
Good question, this was the original plan, however revised it after feedback of Alerting team who are heavy users of Task Manager. They suggested to avoid long running tasks, because it might impact the schedule of other tasks. Also depending on how many tasks are waiting to run, it might take some time for TM to pick up a task. So I thought it is a better idea to start the execution immediately. |
|
@elasticmachine merge upstream |
|
@nchaulet @joshdover are you okay if I merge this today? |
|
@juliaElastic Let me take a look at this now, there were just a few key things I wanted to review. |
| return Object.values( | ||
| res.hits.hits.reduce((acc, hit) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional nit: this combo of values + reduce could probably simplified to a single flatMap call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not quite, the result here is an object, not an array. I reused most of this logic from the existing current_upgrades API.
| let actions = await _getActions(esClient); | ||
| const cancelledActionIds = await _getCancelledActionId(esClient); | ||
|
|
||
| // Fetch acknowledged result for every action | ||
| actions = await pMap( | ||
| actions, | ||
| async (action) => { | ||
| const { count } = await esClient.count({ | ||
| index: AGENT_ACTIONS_RESULTS_INDEX, | ||
| ignore_unavailable: true, | ||
| query: { | ||
| bool: { | ||
| must: [ | ||
| { | ||
| term: { | ||
| action_id: action.actionId, | ||
| }, | ||
| }, | ||
| ], | ||
| }, | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about the number of queries here. We're fetching all actions, then doing a separate query for action results for every action. Instead I think we should try combining that into a single query and use aggregations to get the count value that we need.
Likely something like this (pseudo-ish code):
let actions = await _getActions(esClient);
let acks = await esClient.search({
index: AGENT_ACTIONS_RESULTS_INDEX,
query: {
bool: {
// There's some perf/caching advantages to using filter over must
// See https://www.elastic.co/guide/en/elasticsearch/reference/current/query-filter-context.html#filter-context
filter: actions.map(a => ({ term: { action_id: a.id } }),
}
},
aggs: {
ack_counts: { terms: { field: "action_id" } }
}
})
return actions.map(a => {
return {
...action,
nbAgentsAck: acks.aggs.ack_counts.buckets.find(b => b.key === a.id).doc_count,
// other fields
}
})There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that okay if I refactor this in next pr about Agent activity UI? I already had to make changes to add more info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be okay with refactoring this fetch operation all at once in the next PR. Let's try to land this one and move on to the UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| async function _getActions(esClient: ElasticsearchClient) { | ||
| const res = await esClient.search<FleetServerAgentAction>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to search for all actions? Is there some way to only filter on relevant or recent ones? Similar question on the canceled actions query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a query param to limit the results. Will change this in the next pr.
x-pack/plugins/fleet/server/services/agents/reassign_action_runner.ts
Outdated
Show resolved
Hide resolved
| * On errors, starts a task with Task Manager to retry max 3 times | ||
| * If the last batch was stored in state, retry continues from there (searchAfter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should be checkpointing the state regardless of whether or not there is an error. We need to consider the case where Kibana crashes or is otherwise shutdown. Ideally in this case there's always a task scheduled to confirm that the process was completed successfully. This may require some significant changes to this PR and if so, we could handle it in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I'll tackle this separately.
|
@elasticmachine merge upstream |
💛 Build succeeded, but was flaky
Failed CI StepsTest FailuresMetrics [docs]Public APIs missing comments
Async chunks
Page load bundle
History
To update your PR or re-run it, just comment with: |
Summary
Relates #141567
Changes for agent bulk actions reassign, unenroll, upgrade, update tags:
searchAfterparameter./action_statusAPI to report on the progress of actions -> I moved the UI part out of this pr, it will be covered in another one in a form of flyoutPending work:
.fleet-actions-statusindex to save failed state with error message when kibana processing fails..fleet-action-statusindex from this pr to simplify, will move to another pr. Also it probably needs an elasticsearch change to add the new index mapping..fleet-actionsindex, so we currently don't have any info about the action status.Testing with ess (pr build docker image), 8 Gb integration server memory
https://admin.found.no/deployments/af21b782629cb35eacb6b66ae157d211
reassign 20k agents, kibana processing took 15s, FS processing took about 21m (first 10k took 7m, slowed down for second half)
upgrade 20k agents, kibana processing took 15s, FS processing did 11k after 30m, took about 1hr to complete all (action expired after 30m)
unenroll 20k, kibana processing took 13s, FS processing took 20m
reassign 30k agents, kibana processing took 21s, FS processing took 31m
On average looks like <10s/10k for kibana processing, and around 10m/10k for FS processing.
Checklist