diff --git a/x-pack/plugins/ingest_manager/common/constants/agent.ts b/x-pack/plugins/ingest_manager/common/constants/agent.ts index e9226fa684925..7652c6ac87bce 100644 --- a/x-pack/plugins/ingest_manager/common/constants/agent.ts +++ b/x-pack/plugins/ingest_manager/common/constants/agent.ts @@ -16,3 +16,6 @@ export const AGENT_POLLING_THRESHOLD_MS = 30000; export const AGENT_POLLING_INTERVAL = 1000; export const AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS = 30000; export const AGENT_UPDATE_ACTIONS_INTERVAL_MS = 5000; + +export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS = 5000; +export const AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL = 60; diff --git a/x-pack/plugins/ingest_manager/common/types/index.ts b/x-pack/plugins/ingest_manager/common/types/index.ts index 7f81b04f5e84a..ff08b8a925204 100644 --- a/x-pack/plugins/ingest_manager/common/types/index.ts +++ b/x-pack/plugins/ingest_manager/common/types/index.ts @@ -24,6 +24,8 @@ export interface IngestManagerConfigType { host?: string; ca_sha256?: string; }; + agentConfigRollupRateLimitIntervalMs: number; + agentConfigRollupRateLimitRequestPerInterval: number; }; } diff --git a/x-pack/plugins/ingest_manager/server/constants/index.ts b/x-pack/plugins/ingest_manager/server/constants/index.ts index 650211ce9c1b2..d3c074ff2e8d0 100644 --- a/x-pack/plugins/ingest_manager/server/constants/index.ts +++ b/x-pack/plugins/ingest_manager/server/constants/index.ts @@ -10,6 +10,8 @@ export { AGENT_POLLING_THRESHOLD_MS, AGENT_POLLING_INTERVAL, AGENT_UPDATE_LAST_CHECKIN_INTERVAL_MS, + AGENT_CONFIG_ROLLUP_RATE_LIMIT_REQUEST_PER_INTERVAL, + AGENT_CONFIG_ROLLUP_RATE_LIMIT_INTERVAL_MS, AGENT_UPDATE_ACTIONS_INTERVAL_MS, INDEX_PATTERN_PLACEHOLDER_SUFFIX, // Routes diff --git a/x-pack/plugins/ingest_manager/server/index.ts b/x-pack/plugins/ingest_manager/server/index.ts index 5d6a1ad321b6d..811ec8a3d0222 100644 --- a/x-pack/plugins/ingest_manager/server/index.ts +++ b/x-pack/plugins/ingest_manager/server/index.ts @@ -37,6 +37,8 @@ export const config = { host: schema.maybe(schema.string()), ca_sha256: schema.maybe(schema.string()), }), + agentConfigRollupRateLimitIntervalMs: schema.number({ defaultValue: 5000 }), + agentConfigRollupRateLimitRequestPerInterval: schema.number({ defaultValue: 50 }), }), }), }; diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index 1f9bba8b12be4..a806169019a1e 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -3,12 +3,13 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { Observable } from 'rxjs'; + +import * as Rx from 'rxjs'; export class AbortError extends Error {} export const toPromiseAbortable = ( - observable: Observable, + observable: Rx.Observable, signal?: AbortSignal ): Promise => new Promise((resolve, reject) => { @@ -41,3 +42,63 @@ export const toPromiseAbortable = ( signal.addEventListener('abort', listener, { once: true }); } }); + +export function createLimiter(ratelimitIntervalMs: number, ratelimitRequestPerInterval: number) { + function createCurrentInterval() { + return { + startedAt: Rx.asyncScheduler.now(), + numRequests: 0, + }; + } + + let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval(); + let observers: Array<[Rx.Subscriber, any]> = []; + let timerSubscription: Rx.Subscription | undefined; + + function createTimeout() { + if (timerSubscription) { + return; + } + timerSubscription = Rx.asyncScheduler.schedule(() => { + timerSubscription = undefined; + currentInterval = createCurrentInterval(); + for (const [waitingObserver, value] of observers) { + if (currentInterval.numRequests >= ratelimitRequestPerInterval) { + createTimeout(); + continue; + } + currentInterval.numRequests++; + waitingObserver.next(value); + } + }, ratelimitIntervalMs); + } + + return function limit(): Rx.MonoTypeOperatorFunction { + return (observable) => + new Rx.Observable((observer) => { + const subscription = observable.subscribe({ + next(value) { + if (currentInterval.numRequests < ratelimitRequestPerInterval) { + currentInterval.numRequests++; + observer.next(value); + return; + } + + observers = [...observers, [observer, value]]; + createTimeout(); + }, + error(err) { + observer.error(err); + }, + complete() { + observer.complete(); + }, + }); + + return () => { + observers = observers.filter((o) => o[0] !== observer); + subscription.unsubscribe(); + }; + }); + }; +} diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts index 0f30ab409f381..5ceb774a1946c 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/state_new_actions.ts @@ -28,7 +28,7 @@ import * as APIKeysService from '../../api_keys'; import { AGENT_SAVED_OBJECT_TYPE, AGENT_UPDATE_ACTIONS_INTERVAL_MS } from '../../../constants'; import { createAgentAction, getNewActionsSince } from '../actions'; import { appContextService } from '../../app_context'; -import { toPromiseAbortable, AbortError } from './rxjs_utils'; +import { toPromiseAbortable, AbortError, createLimiter } from './rxjs_utils'; function getInternalUserSOClient() { const fakeRequest = ({ @@ -95,19 +95,23 @@ async function getOrCreateAgentDefaultOutputAPIKey( return outputAPIKey.key; } -async function createAgentActionFromConfigIfOutdated( - soClient: SavedObjectsClientContract, - agent: Agent, - config: FullAgentConfig | null -) { +function shouldCreateAgentConfigAction(agent: Agent, config: FullAgentConfig | null): boolean { if (!config || !config.revision) { - return; + return false; } const isAgentConfigOutdated = !agent.config_revision || agent.config_revision < config.revision; if (!isAgentConfigOutdated) { - return; + return false; } + return true; +} + +async function createAgentActionFromConfig( + soClient: SavedObjectsClientContract, + agent: Agent, + config: FullAgentConfig | null +) { // Deep clone !not supporting Date, and undefined value. const newConfig = JSON.parse(JSON.stringify(config)); @@ -129,6 +133,11 @@ export function agentCheckinStateNewActionsFactory() { // Shared Observables const agentConfigs$ = new Map>(); const newActions$ = createNewActionsSharedObservable(); + // Rx operators + const rateLimiter = createLimiter( + appContextService.getConfig()?.fleet.agentConfigRollupRateLimitIntervalMs || 5000, + appContextService.getConfig()?.fleet.agentConfigRollupRateLimitRequestPerInterval || 50 + ); async function subscribeToNewActions( soClient: SavedObjectsClientContract, @@ -148,7 +157,9 @@ export function agentCheckinStateNewActionsFactory() { } const stream$ = agentConfig$.pipe( timeout(appContextService.getConfig()?.fleet.pollingRequestTimeout || 0), - mergeMap((config) => createAgentActionFromConfigIfOutdated(soClient, agent, config)), + filter((config) => shouldCreateAgentConfigAction(agent, config)), + rateLimiter(), + mergeMap((config) => createAgentActionFromConfig(soClient, agent, config)), merge(newActions$), mergeMap(async (data) => { if (!data) {