From 8bdd931c04eb25b4e255f601e7ac829b4771b4cb Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Thu, 20 Aug 2020 13:21:15 -0400 Subject: [PATCH 1/3] [Ingest Manager] Remove useless saved object update in agent checkin --- .../server/services/agents/checkin/index.ts | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts index ece38f86b4987..759cafa6c803f 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts @@ -4,6 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ +import deepEqual from 'fast-deep-equal'; import { SavedObjectsClientContract, SavedObjectsBulkCreateObject } from 'src/core/server'; import { Agent, @@ -29,18 +30,23 @@ export async function agentCheckin( ) { const updateData: Partial = {}; const { updatedErrorEvents } = await processEventsForCheckin(soClient, agent, data.events); - if (updatedErrorEvents) { + if ( + updatedErrorEvents && + !(updatedErrorEvents.length === 0 && agent.current_error_events.length === 0) + ) { updateData.current_error_events = JSON.stringify(updatedErrorEvents); } - if (data.localMetadata) { + if (data.localMetadata && !deepEqual(data.localMetadata, agent.local_metadata)) { updateData.local_metadata = data.localMetadata; } - if (data.status !== agent.last_checkin_status) { updateData.last_checkin_status = data.status; } + // Update agent only if something changed if (Object.keys(updateData).length > 0) { - await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData); + await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData, { + refresh: false, + }); } // Check if some actions are not acknowledged From 3907dad22e5b873b42636d82295ff7925f4f541b Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Thu, 20 Aug 2020 18:55:30 -0400 Subject: [PATCH 2/3] Try better rate limiter --- .../services/agents/checkin/rxjs_utils.ts | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index dddade6841460..87578da899a94 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -47,33 +47,41 @@ export function createRateLimiter( ratelimitIntervalMs: number, ratelimitRequestPerInterval: number ) { - function createCurrentInterval() { - return { - startedAt: Rx.asyncScheduler.now(), - numRequests: 0, - }; + const waitingObservers = new Map, any>(); + + let tokens = ratelimitRequestPerInterval; + const tokenSubject = new Rx.BehaviorSubject(tokens); + function addToken() { + if (tokens < ratelimitRequestPerInterval) { + tokenSubject.next(++tokens); + } + } + + function consumeToken() { + tokenSubject.next(--tokens); } - let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval(); - let observers: Array<[Rx.Subscriber, any]> = []; - let timerSubscription: Rx.Subscription | undefined; + function isTokenAvailable() { + return tokens > 0; + } + + tokenSubject.subscribe(() => { + publishIfTokensAvailable(); + }); - function createTimeout() { - if (timerSubscription) { + function publishIfTokensAvailable() { + if (!isTokenAvailable()) { return; } - timerSubscription = Rx.asyncScheduler.schedule(() => { - timerSubscription = undefined; - currentInterval = createCurrentInterval(); - for (const [waitingObserver, value] of observers) { - if (currentInterval.numRequests >= ratelimitRequestPerInterval) { - createTimeout(); - continue; - } - currentInterval.numRequests++; - waitingObserver.next(value); - } - }, ratelimitIntervalMs); + const ite = waitingObservers.entries().next(); + if (!ite.done) { + consumeToken(); + waitingObservers.delete(ite.value[0]); + ite.value[0].next(ite.value[1]); + Rx.asyncScheduler.schedule(() => { + addToken(); + }, ratelimitIntervalMs); + } } return function limit(): Rx.MonoTypeOperatorFunction { @@ -81,14 +89,10 @@ export function createRateLimiter( new Rx.Observable((observer) => { const subscription = observable.subscribe({ next(value) { - if (currentInterval.numRequests < ratelimitRequestPerInterval) { - currentInterval.numRequests++; - observer.next(value); - return; - } + waitingObservers.delete(observer); + waitingObservers.set(observer, value); - observers = [...observers, [observer, value]]; - createTimeout(); + publishIfTokensAvailable(); }, error(err) { observer.error(err); @@ -99,7 +103,7 @@ export function createRateLimiter( }); return () => { - observers = observers.filter((o) => o[0] !== observer); + waitingObservers.delete(observer); subscription.unsubscribe(); }; }); From 0dc45d498eac688fe7c7ce214914418efe738c83 Mon Sep 17 00:00:00 2001 From: Nicolas Chaulet Date: Wed, 26 Aug 2020 09:30:57 -0400 Subject: [PATCH 3/3] Revert typo --- .../server/services/agents/checkin/index.ts | 4 +- .../services/agents/checkin/rxjs_utils.ts | 64 +++++++++---------- 2 files changed, 31 insertions(+), 37 deletions(-) diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts index 5ab2aa6483d88..19a5c2dc08762 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/index.ts @@ -44,9 +44,7 @@ export async function agentCheckin( } // Update agent only if something changed if (Object.keys(updateData).length > 0) { - await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData, { - refresh: false, - }); + await soClient.update(AGENT_SAVED_OBJECT_TYPE, agent.id, updateData); } // Check if some actions are not acknowledged diff --git a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts index 87578da899a94..dddade6841460 100644 --- a/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts +++ b/x-pack/plugins/ingest_manager/server/services/agents/checkin/rxjs_utils.ts @@ -47,41 +47,33 @@ export function createRateLimiter( ratelimitIntervalMs: number, ratelimitRequestPerInterval: number ) { - const waitingObservers = new Map, any>(); - - let tokens = ratelimitRequestPerInterval; - const tokenSubject = new Rx.BehaviorSubject(tokens); - function addToken() { - if (tokens < ratelimitRequestPerInterval) { - tokenSubject.next(++tokens); - } - } - - function consumeToken() { - tokenSubject.next(--tokens); - } - - function isTokenAvailable() { - return tokens > 0; + function createCurrentInterval() { + return { + startedAt: Rx.asyncScheduler.now(), + numRequests: 0, + }; } - tokenSubject.subscribe(() => { - publishIfTokensAvailable(); - }); + let currentInterval: { startedAt: number; numRequests: number } = createCurrentInterval(); + let observers: Array<[Rx.Subscriber, any]> = []; + let timerSubscription: Rx.Subscription | undefined; - function publishIfTokensAvailable() { - if (!isTokenAvailable()) { + function createTimeout() { + if (timerSubscription) { return; } - const ite = waitingObservers.entries().next(); - if (!ite.done) { - consumeToken(); - waitingObservers.delete(ite.value[0]); - ite.value[0].next(ite.value[1]); - Rx.asyncScheduler.schedule(() => { - addToken(); - }, ratelimitIntervalMs); - } + timerSubscription = Rx.asyncScheduler.schedule(() => { + timerSubscription = undefined; + currentInterval = createCurrentInterval(); + for (const [waitingObserver, value] of observers) { + if (currentInterval.numRequests >= ratelimitRequestPerInterval) { + createTimeout(); + continue; + } + currentInterval.numRequests++; + waitingObserver.next(value); + } + }, ratelimitIntervalMs); } return function limit(): Rx.MonoTypeOperatorFunction { @@ -89,10 +81,14 @@ export function createRateLimiter( new Rx.Observable((observer) => { const subscription = observable.subscribe({ next(value) { - waitingObservers.delete(observer); - waitingObservers.set(observer, value); + if (currentInterval.numRequests < ratelimitRequestPerInterval) { + currentInterval.numRequests++; + observer.next(value); + return; + } - publishIfTokensAvailable(); + observers = [...observers, [observer, value]]; + createTimeout(); }, error(err) { observer.error(err); @@ -103,7 +99,7 @@ export function createRateLimiter( }); return () => { - waitingObservers.delete(observer); + observers = observers.filter((o) => o[0] !== observer); subscription.unsubscribe(); }; });