From 4b023f43502b7552e68aab8fd25ab221a4f66b11 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Tue, 22 Mar 2022 23:24:12 +0100 Subject: [PATCH 1/8] WIP --- .../server/status/cached_plugins_status.ts | 50 +++ src/core/server/status/plugins_status.test.ts | 3 +- src/core/server/status/plugins_status.ts | 312 +++++++++++------- src/core/server/status/status_service.ts | 4 +- 4 files changed, 247 insertions(+), 122 deletions(-) create mode 100644 src/core/server/status/cached_plugins_status.ts diff --git a/src/core/server/status/cached_plugins_status.ts b/src/core/server/status/cached_plugins_status.ts new file mode 100644 index 0000000000000..c598d09633b52 --- /dev/null +++ b/src/core/server/status/cached_plugins_status.ts @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +import { Observable } from 'rxjs'; + +import { type PluginName } from '../plugins'; +import { type ServiceStatus } from './types'; + +import { type Deps, PluginsStatusService } from './plugins_status'; + +export class CachedPluginsStatusService extends PluginsStatusService { + private all$?: Observable>; + private dependenciesStatuses$: Record>>; + private derivedStatuses$: Record>; + + constructor(deps: Deps) { + super(deps); + this.dependenciesStatuses$ = {}; + this.derivedStatuses$ = {}; + } + + public getAll$(): Observable> { + if (!this.all$) { + this.all$ = super.getAll$(); + } + + return this.all$; + } + + public getDependenciesStatus$(plugin: PluginName): Observable> { + if (!this.dependenciesStatuses$[plugin]) { + this.dependenciesStatuses$[plugin] = super.getDependenciesStatus$(plugin); + } + + return this.dependenciesStatuses$[plugin]; + } + + public getDerivedStatus$(plugin: PluginName): Observable { + if (!this.derivedStatuses$[plugin]) { + this.derivedStatuses$[plugin] = super.getDerivedStatus$(plugin); + } + + return this.derivedStatuses$[plugin]; + } +} diff --git a/src/core/server/status/plugins_status.test.ts b/src/core/server/status/plugins_status.test.ts index 0befbf63bd186..d0584eb6d8e74 100644 --- a/src/core/server/status/plugins_status.test.ts +++ b/src/core/server/status/plugins_status.test.ts @@ -15,7 +15,8 @@ import { ServiceStatusLevelSnapshotSerializer } from './test_utils'; expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer); -describe('PluginStatusService', () => { +// FIXME temporarily skipping these tests to asses performance of this solution for https://github.com/elastic/kibana/issues/128061 +describe.skip('PluginStatusService', () => { const coreAllAvailable$: Observable = of({ elasticsearch: { level: ServiceStatusLevels.available, summary: 'elasticsearch avail' }, savedObjects: { level: ServiceStatusLevels.available, summary: 'savedObjects avail' }, diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index c4e8e7e364248..22853007b2e93 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -5,50 +5,97 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ +import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { map, distinctUntilChanged, pluck, filter, debounceTime, bufferTime } from 'rxjs/operators'; +import { sortBy } from 'lodash'; -import { BehaviorSubject, Observable, combineLatest, of } from 'rxjs'; -import { - map, - distinctUntilChanged, - switchMap, - debounceTime, - timeoutWith, - startWith, -} from 'rxjs/operators'; -import { isDeepStrictEqual } from 'util'; - -import { PluginName } from '../plugins'; -import { ServiceStatus, CoreStatus, ServiceStatusLevels } from './types'; +import { type PluginName } from '../plugins'; +import { type ServiceStatus, type CoreStatus, ServiceStatusLevels } from './types'; import { getSummaryStatus } from './get_summary_status'; -const STATUS_TIMEOUT_MS = 30 * 1000; // 30 seconds - -interface Deps { +const defaultStatus: ServiceStatus = { + level: ServiceStatusLevels.unavailable, + summary: 'Unknown status', +}; +export interface Deps { core$: Observable; pluginDependencies: ReadonlyMap; } +interface PluginData { + [name: PluginName]: { + name: PluginName; + depth: number; + dependencies: PluginName[]; + reverseDependencies: PluginName[]; + reportedStatus?: ServiceStatus; + derivedStatus: ServiceStatus; + }; +} + +interface UpdatedPlugins { + [name: PluginName]: boolean; +} + +interface PluginStatus { + [name: PluginName]: ServiceStatus; +} + +interface PluginReportedStatus { + [name: PluginName]: Subscription; +} + export class PluginsStatusService { - private readonly pluginStatuses = new Map>(); - private readonly derivedStatuses = new Map>(); - private readonly dependenciesStatuses = new Map< - PluginName, - Observable> - >(); - private allPluginsStatuses?: Observable>; - - private readonly update$ = new BehaviorSubject(true); - private readonly defaultInheritedStatus$: Observable; + private coreStatus: CoreStatus = { elasticsearch: defaultStatus, savedObjects: defaultStatus }; + private pluginData: PluginData; + private rootPlugins: PluginName[]; + private orderedPluginNames: PluginName[]; + private pluginData$ = new ReplaySubject(1); + private pluginStatus: PluginStatus; + private pluginStatus$ = new ReplaySubject(1); + private pluginReportedStatus: PluginReportedStatus = {}; + private updatePluginStatuses$ = new Subject(); private newRegistrationsAllowed = true; constructor(private readonly deps: Deps) { - this.defaultInheritedStatus$ = this.deps.core$.pipe( - map((coreStatus) => { - return getSummaryStatus(Object.entries(coreStatus), { - allAvailableSummary: `All dependencies are available`, - }); - }) - ); + this.pluginStatus = {}; + this.pluginData = this.initPluginData(deps.pluginDependencies); + this.rootPlugins = this.getRootPlugins(); + this.orderedPluginNames = this.getOrderedPluginNames(); + + // console.log('⭐ ROOT PLUGINS', this.rootPlugins.length, this.rootPlugins.join(', ')); + // console.log('⭐ ORDERED PLUGINS', this.orderedPluginNames.length, this.orderedPluginNames.join(', ')); + + this.updatePluginStatuses$ + .asObservable() + .pipe( + bufferTime(50), + filter((plugins) => plugins.length > 0) + ) + .subscribe((plugins) => { + this.updatePluginsStatuses(plugins); + + this.pluginData$.next(this.pluginData); + this.pluginStatus$.next(this.pluginStatus); + }); + + this.deps.core$.pipe(debounceTime(100)).subscribe((coreStatus) => { + this.coreStatus = coreStatus!; + // console.log('⚡ CORE STATUS! elastic: ', coreStatus.elasticsearch.level.toString(), '; savedObjects: ', coreStatus.savedObjects.level.toString()); + const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { + allAvailableSummary: `All dependencies are available`, + }); + + this.rootPlugins.forEach((plugin) => { + this.pluginData[plugin].derivedStatus = derivedStatus; + if (!this.pluginData[plugin].reportedStatus) { + // this root plugin has NOT reported any status yet. Thus, its status is derived from core + this.pluginStatus[plugin] = derivedStatus; + } + + this.updatePluginStatuses$.next(plugin); + }); + }); } public set(plugin: PluginName, status$: Observable) { @@ -57,8 +104,21 @@ export class PluginsStatusService { `Custom statuses cannot be registered after setup, plugin [${plugin}] attempted` ); } - this.pluginStatuses.set(plugin, status$); - this.update$.next(true); // trigger all existing Observables to update from the new source Observable + + const subscription = this.pluginReportedStatus[plugin]; + if (subscription) subscription.unsubscribe(); + + this.pluginReportedStatus[plugin] = status$.subscribe((status) => { + // console.log('⭐ Reported status!', plugin, status.level.toString()); + const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; + + this.pluginData[plugin].reportedStatus = status; + this.pluginStatus[plugin] = status; + + if (status.level !== previousReportedLevel) { + this.updatePluginStatuses$.next(plugin); + } + }); } public blockNewRegistrations() { @@ -66,105 +126,117 @@ export class PluginsStatusService { } public getAll$(): Observable> { - if (!this.allPluginsStatuses) { - this.allPluginsStatuses = this.getPluginStatuses$([...this.deps.pluginDependencies.keys()]); - } - return this.allPluginsStatuses; + return this.pluginStatus$.asObservable(); } public getDependenciesStatus$(plugin: PluginName): Observable> { - const dependencies = this.deps.pluginDependencies.get(plugin); - if (!dependencies) { - throw new Error(`Unknown plugin: ${plugin}`); - } - if (!this.dependenciesStatuses.has(plugin)) { - this.dependenciesStatuses.set( - plugin, - this.getPluginStatuses$(dependencies).pipe( - // Prevent many emissions at once from dependency status resolution from making this too noisy - debounceTime(25) - ) - ); - } - return this.dependenciesStatuses.get(plugin)!; + const directDependencies = this.pluginData[plugin].dependencies; + + return this.pluginStatus$.asObservable().pipe( + map((allStatus) => + Object.keys(allStatus) + .filter((dep) => directDependencies.includes(dep)) + .reduce((acc: PluginStatus, key: PluginName) => { + acc[key] = allStatus[key]; + return acc; + }, {}) + ), + distinctUntilChanged() + ); } public getDerivedStatus$(plugin: PluginName): Observable { - if (!this.derivedStatuses.has(plugin)) { - this.derivedStatuses.set( - plugin, - this.update$.pipe( - debounceTime(25), // Avoid calling the plugin's custom status logic for every plugin that depends on it. - switchMap(() => { - // Only go up the dependency tree if any of this plugin's dependencies have a custom status - // Helps eliminate memory overhead of creating thousands of Observables unnecessarily. - if (this.anyCustomStatuses(plugin)) { - return combineLatest([this.deps.core$, this.getDependenciesStatus$(plugin)]).pipe( - map(([coreStatus, pluginStatuses]) => { - return getSummaryStatus( - [...Object.entries(coreStatus), ...Object.entries(pluginStatuses)], - { - allAvailableSummary: `All dependencies are available`, - } - ); - }) - ); - } else { - return this.defaultInheritedStatus$; - } - }) - ) - ); - } - return this.derivedStatuses.get(plugin)!; + return this.pluginData$.asObservable().pipe( + pluck(plugin, 'derivedStatus'), + filter((status: ServiceStatus | undefined): status is ServiceStatus => !!status), + distinctUntilChanged() + ); } - private getPluginStatuses$(plugins: PluginName[]): Observable> { - if (plugins.length === 0) { - return of({}); + private initPluginData(pluginDependencies: ReadonlyMap): PluginData { + const pluginData: PluginData = {}; + + if (pluginDependencies) { + pluginDependencies.forEach((dependencies, name) => { + pluginData[name] = { + name, + depth: 0, + dependencies, + reverseDependencies: [], + derivedStatus: defaultStatus, + }; + }); + + pluginDependencies.forEach((dependencies, name) => { + dependencies.forEach((dependency) => { + pluginData[dependency].reverseDependencies.push(name); + }); + }); } - return this.update$.pipe( - switchMap(() => { - const pluginStatuses = plugins - .map((depName) => { - const pluginStatus = this.pluginStatuses.get(depName) - ? this.pluginStatuses.get(depName)!.pipe( - timeoutWith( - STATUS_TIMEOUT_MS, - this.pluginStatuses.get(depName)!.pipe( - startWith({ - level: ServiceStatusLevels.unavailable, - summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`, - }) - ) - ) - ) - : this.getDerivedStatus$(depName); - return [depName, pluginStatus] as [PluginName, Observable]; - }) - .map(([pName, status$]) => - status$.pipe(map((status) => [pName, status] as [PluginName, ServiceStatus])) - ); - - return combineLatest(pluginStatuses).pipe( - map((statuses) => Object.fromEntries(statuses)), - distinctUntilChanged>(isDeepStrictEqual) - ); - }) + return pluginData; + } + + private getRootPlugins(): PluginName[] { + return Object.keys(this.pluginData).filter( + (plugin) => this.pluginData[plugin].dependencies.length === 0 + ); + } + + private getOrderedPluginNames(): PluginName[] { + this.rootPlugins.forEach((plugin) => { + this.calculateDepthRecursive(plugin, 1); + }); + + return sortBy(Object.values(this.pluginData), ['depth', 'name']).map(({ name }) => name); + } + + private calculateDepthRecursive(plugin: PluginName, depth: number) { + const pluginData = this.pluginData[plugin]; + pluginData.depth = Math.max(pluginData.depth, depth); + const newDepth = depth + 1; + pluginData.reverseDependencies.forEach((revDep) => + this.calculateDepthRecursive(revDep, newDepth) ); } - /** - * Determines whether or not this plugin or any plugin in it's dependency tree have a custom status registered. - */ - private anyCustomStatuses(plugin: PluginName): boolean { - if (this.pluginStatuses.get(plugin)) { - return true; + private updatePluginStatus(plugin: PluginName): void { + const newStatus = this.determinePluginStatus(plugin); + const pluginData = this.pluginData[plugin]; + pluginData.derivedStatus = newStatus; + + if (!pluginData.reportedStatus) { + // this plugin has NOT reported any status yet. Thus, its status is derived from its dependencies + core + this.pluginStatus[plugin] = newStatus; + } + } + + private updatePluginsStatuses(plugins: PluginName[]): void { + const toCheck = new Set(plugins); + for (let i = 0; i < this.orderedPluginNames.length && toCheck.size > 0; ++i) { + const current = this.orderedPluginNames[i]; + if (toCheck.has(current)) { + // update the current plugin status + this.updatePluginStatus(current); + // flag all its reverse dependencies to be checked + this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep)); + } } + } + + private determinePluginStatus(plugin: PluginName): ServiceStatus { + const coreStatus: Array<[PluginName, ServiceStatus]> = Object.entries(this.coreStatus); + const depsStatus: Array<[PluginName, ServiceStatus]> = this.pluginData[plugin].dependencies.map( + (dependency) => [ + dependency, + this.pluginData[dependency].reportedStatus || this.pluginData[dependency].derivedStatus, + ] + ); + + const newStatus = getSummaryStatus([...coreStatus, ...depsStatus], { + allAvailableSummary: `All dependencies are available`, + }); - return this.deps.pluginDependencies - .get(plugin)! - .reduce((acc, depName) => acc || this.anyCustomStatuses(depName), false as boolean); + return newStatus; } } diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index 63a1b02d5b2e7..f022b74a2c6a4 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -27,6 +27,8 @@ import { ServiceStatus, CoreStatus, InternalStatusServiceSetup } from './types'; import { getSummaryStatus } from './get_summary_status'; import { PluginsStatusService } from './plugins_status'; import { getOverallStatusChanges } from './log_overall_status'; +import { CachedPluginsStatusService } from './cached_plugins_status'; +import { ServiceStatusLevels } from '..'; interface StatusLogMeta extends LogMeta { kibana: { status: ServiceStatus }; @@ -67,7 +69,7 @@ export class StatusService implements CoreService { }: SetupDeps) { const statusConfig = await this.config$.pipe(take(1)).toPromise(); const core$ = this.setupCoreStatus({ elasticsearch, savedObjects }); - this.pluginsStatus = new PluginsStatusService({ core$, pluginDependencies }); + this.pluginsStatus = new CachedPluginsStatusService({ core$, pluginDependencies }); this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe( // Prevent many emissions at once from dependency status resolution from making this too noisy From 02e19a8473066a4af0b1fc11f5d719a5b0e99a44 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Wed, 23 Mar 2022 10:39:57 +0100 Subject: [PATCH 2/8] Fix behavior when no plugins are defined --- src/core/server/status/plugins_status.ts | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index 22853007b2e93..a5c5e0da88e3e 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -5,7 +5,7 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { BehaviorSubject, Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; import { map, distinctUntilChanged, pluck, filter, debounceTime, bufferTime } from 'rxjs/operators'; import { sortBy } from 'lodash'; @@ -32,11 +32,6 @@ interface PluginData { derivedStatus: ServiceStatus; }; } - -interface UpdatedPlugins { - [name: PluginName]: boolean; -} - interface PluginStatus { [name: PluginName]: ServiceStatus; } @@ -51,14 +46,13 @@ export class PluginsStatusService { private rootPlugins: PluginName[]; private orderedPluginNames: PluginName[]; private pluginData$ = new ReplaySubject(1); - private pluginStatus: PluginStatus; - private pluginStatus$ = new ReplaySubject(1); + private pluginStatus: PluginStatus = {}; + private pluginStatus$ = new BehaviorSubject(this.pluginStatus); private pluginReportedStatus: PluginReportedStatus = {}; private updatePluginStatuses$ = new Subject(); private newRegistrationsAllowed = true; constructor(private readonly deps: Deps) { - this.pluginStatus = {}; this.pluginData = this.initPluginData(deps.pluginDependencies); this.rootPlugins = this.getRootPlugins(); this.orderedPluginNames = this.getOrderedPluginNames(); @@ -74,12 +68,11 @@ export class PluginsStatusService { ) .subscribe((plugins) => { this.updatePluginsStatuses(plugins); - this.pluginData$.next(this.pluginData); this.pluginStatus$.next(this.pluginStatus); }); - this.deps.core$.pipe(debounceTime(100)).subscribe((coreStatus) => { + this.deps.core$.pipe(debounceTime(50)).subscribe((coreStatus) => { this.coreStatus = coreStatus!; // console.log('⚡ CORE STATUS! elastic: ', coreStatus.elasticsearch.level.toString(), '; savedObjects: ', coreStatus.savedObjects.level.toString()); const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { From 6f3ac9274de3755c0bd7ecc31ceb60c80decb05c Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Thu, 24 Mar 2022 09:41:43 +0100 Subject: [PATCH 3/8] Remove unused import, reduce debounce times --- src/core/server/status/plugins_status.ts | 4 ++-- src/core/server/status/status_service.ts | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index a5c5e0da88e3e..98e7ca73048b4 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -63,7 +63,7 @@ export class PluginsStatusService { this.updatePluginStatuses$ .asObservable() .pipe( - bufferTime(50), + bufferTime(10), filter((plugins) => plugins.length > 0) ) .subscribe((plugins) => { @@ -72,7 +72,7 @@ export class PluginsStatusService { this.pluginStatus$.next(this.pluginStatus); }); - this.deps.core$.pipe(debounceTime(50)).subscribe((coreStatus) => { + this.deps.core$.pipe(debounceTime(10)).subscribe((coreStatus) => { this.coreStatus = coreStatus!; // console.log('⚡ CORE STATUS! elastic: ', coreStatus.elasticsearch.level.toString(), '; savedObjects: ', coreStatus.savedObjects.level.toString()); const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index f022b74a2c6a4..dc79857bbe051 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -28,7 +28,6 @@ import { getSummaryStatus } from './get_summary_status'; import { PluginsStatusService } from './plugins_status'; import { getOverallStatusChanges } from './log_overall_status'; import { CachedPluginsStatusService } from './cached_plugins_status'; -import { ServiceStatusLevels } from '..'; interface StatusLogMeta extends LogMeta { kibana: { status: ServiceStatus }; From 0ee387956dc1e6c75a422c93c000b6e870165492 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Thu, 24 Mar 2022 13:13:12 +0100 Subject: [PATCH 4/8] Fix startup behavior --- src/core/server/status/plugins_status.ts | 66 +++++++++++++++--------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index 98e7ca73048b4..93a3246be504a 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -5,18 +5,30 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { BehaviorSubject, Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; -import { map, distinctUntilChanged, pluck, filter, debounceTime, bufferTime } from 'rxjs/operators'; +import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { + map, + distinctUntilChanged, + pluck, + filter, + debounceTime, + bufferTime, + timeoutWith, + startWith, +} from 'rxjs/operators'; import { sortBy } from 'lodash'; import { type PluginName } from '../plugins'; import { type ServiceStatus, type CoreStatus, ServiceStatusLevels } from './types'; import { getSummaryStatus } from './get_summary_status'; +const STATUS_TIMEOUT_MS = 30 * 1000; // 30 seconds + const defaultStatus: ServiceStatus = { level: ServiceStatusLevels.unavailable, - summary: 'Unknown status', + summary: `Status check timed out after ${STATUS_TIMEOUT_MS / 1000}s`, }; + export interface Deps { core$: Observable; pluginDependencies: ReadonlyMap; @@ -47,7 +59,7 @@ export class PluginsStatusService { private orderedPluginNames: PluginName[]; private pluginData$ = new ReplaySubject(1); private pluginStatus: PluginStatus = {}; - private pluginStatus$ = new BehaviorSubject(this.pluginStatus); + private pluginStatus$ = new ReplaySubject(1); private pluginReportedStatus: PluginReportedStatus = {}; private updatePluginStatuses$ = new Subject(); private newRegistrationsAllowed = true; @@ -57,13 +69,10 @@ export class PluginsStatusService { this.rootPlugins = this.getRootPlugins(); this.orderedPluginNames = this.getOrderedPluginNames(); - // console.log('⭐ ROOT PLUGINS', this.rootPlugins.length, this.rootPlugins.join(', ')); - // console.log('⭐ ORDERED PLUGINS', this.orderedPluginNames.length, this.orderedPluginNames.join(', ')); - this.updatePluginStatuses$ .asObservable() .pipe( - bufferTime(10), + bufferTime(25), filter((plugins) => plugins.length > 0) ) .subscribe((plugins) => { @@ -72,17 +81,16 @@ export class PluginsStatusService { this.pluginStatus$.next(this.pluginStatus); }); - this.deps.core$.pipe(debounceTime(10)).subscribe((coreStatus) => { + this.deps.core$.pipe(debounceTime(25)).subscribe((coreStatus) => { this.coreStatus = coreStatus!; - // console.log('⚡ CORE STATUS! elastic: ', coreStatus.elasticsearch.level.toString(), '; savedObjects: ', coreStatus.savedObjects.level.toString()); const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { allAvailableSummary: `All dependencies are available`, }); this.rootPlugins.forEach((plugin) => { this.pluginData[plugin].derivedStatus = derivedStatus; - if (!this.pluginData[plugin].reportedStatus) { - // this root plugin has NOT reported any status yet. Thus, its status is derived from core + if (!this.pluginReportedStatus[plugin]) { + // this root plugin has NOT registered any status Observable. Thus, its status is derived from core this.pluginStatus[plugin] = derivedStatus; } @@ -101,17 +109,22 @@ export class PluginsStatusService { const subscription = this.pluginReportedStatus[plugin]; if (subscription) subscription.unsubscribe(); - this.pluginReportedStatus[plugin] = status$.subscribe((status) => { - // console.log('⭐ Reported status!', plugin, status.level.toString()); - const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; + this.pluginReportedStatus[plugin] = status$ + // Set a timeout for custom status Observables + .pipe(timeoutWith(STATUS_TIMEOUT_MS, status$.pipe(startWith(defaultStatus)))) + .subscribe((status) => { + const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; - this.pluginData[plugin].reportedStatus = status; - this.pluginStatus[plugin] = status; + this.pluginData[plugin].reportedStatus = status; + this.pluginStatus[plugin] = status; - if (status.level !== previousReportedLevel) { - this.updatePluginStatuses$.next(plugin); - } - }); + if (status.level !== previousReportedLevel) { + this.updatePluginStatuses$.next(plugin); + } + }); + + // delete any derived statuses calculated before the custom status observable was registered + delete this.pluginStatus[plugin]; } public blockNewRegistrations() { @@ -119,13 +132,16 @@ export class PluginsStatusService { } public getAll$(): Observable> { - return this.pluginStatus$.asObservable(); + return this.pluginStatus$.asObservable().pipe( + // do not emit until we have a status for all plugins + filter((all) => Object.keys(all).length === this.orderedPluginNames.length) + ); } public getDependenciesStatus$(plugin: PluginName): Observable> { const directDependencies = this.pluginData[plugin].dependencies; - return this.pluginStatus$.asObservable().pipe( + return this.getAll$().pipe( map((allStatus) => Object.keys(allStatus) .filter((dep) => directDependencies.includes(dep)) @@ -198,8 +214,8 @@ export class PluginsStatusService { const pluginData = this.pluginData[plugin]; pluginData.derivedStatus = newStatus; - if (!pluginData.reportedStatus) { - // this plugin has NOT reported any status yet. Thus, its status is derived from its dependencies + core + if (!this.pluginReportedStatus[plugin]) { + // this plugin has NOT registered any status Observable. Thus, its status is derived from its dependencies + core this.pluginStatus[plugin] = newStatus; } } From 7c71a628f45af9f9227300115ea0179348bfd9af Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Thu, 24 Mar 2022 13:55:45 +0100 Subject: [PATCH 5/8] Misc improvements following PR comments --- .../server/status/cached_plugins_status.ts | 4 +-- src/core/server/status/plugins_status.ts | 36 ++++++++++--------- src/core/server/status/status_service.ts | 7 ++-- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/core/server/status/cached_plugins_status.ts b/src/core/server/status/cached_plugins_status.ts index c598d09633b52..fec9f51e63172 100644 --- a/src/core/server/status/cached_plugins_status.ts +++ b/src/core/server/status/cached_plugins_status.ts @@ -11,9 +11,9 @@ import { Observable } from 'rxjs'; import { type PluginName } from '../plugins'; import { type ServiceStatus } from './types'; -import { type Deps, PluginsStatusService } from './plugins_status'; +import { type Deps, PluginsStatusService as BasePluginsStatusService } from './plugins_status'; -export class CachedPluginsStatusService extends PluginsStatusService { +export class PluginsStatusService extends BasePluginsStatusService { private all$?: Observable>; private dependenciesStatuses$: Record>>; private derivedStatuses$: Record>; diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index 93a3246be504a..a9836a6ad09a5 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -9,7 +9,6 @@ import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; import { map, distinctUntilChanged, - pluck, filter, debounceTime, bufferTime, @@ -48,7 +47,7 @@ interface PluginStatus { [name: PluginName]: ServiceStatus; } -interface PluginReportedStatus { +interface ReportedStatusSubscriptions { [name: PluginName]: Subscription; } @@ -60,7 +59,7 @@ export class PluginsStatusService { private pluginData$ = new ReplaySubject(1); private pluginStatus: PluginStatus = {}; private pluginStatus$ = new ReplaySubject(1); - private pluginReportedStatus: PluginReportedStatus = {}; + private reportedStatusSubscriptions: ReportedStatusSubscriptions = {}; private updatePluginStatuses$ = new Subject(); private newRegistrationsAllowed = true; @@ -89,7 +88,7 @@ export class PluginsStatusService { this.rootPlugins.forEach((plugin) => { this.pluginData[plugin].derivedStatus = derivedStatus; - if (!this.pluginReportedStatus[plugin]) { + if (!this.reportedStatusSubscriptions[plugin]) { // this root plugin has NOT registered any status Observable. Thus, its status is derived from core this.pluginStatus[plugin] = derivedStatus; } @@ -106,10 +105,10 @@ export class PluginsStatusService { ); } - const subscription = this.pluginReportedStatus[plugin]; - if (subscription) subscription.unsubscribe(); + // unsubscribe from any previous subscriptions. Ideally plugins should register a status Observable only once + this.reportedStatusSubscriptions[plugin]?.unsubscribe(); - this.pluginReportedStatus[plugin] = status$ + this.reportedStatusSubscriptions[plugin] = status$ // Set a timeout for custom status Observables .pipe(timeoutWith(STATUS_TIMEOUT_MS, status$.pipe(startWith(defaultStatus)))) .subscribe((status) => { @@ -142,26 +141,29 @@ export class PluginsStatusService { const directDependencies = this.pluginData[plugin].dependencies; return this.getAll$().pipe( - map((allStatus) => - Object.keys(allStatus) - .filter((dep) => directDependencies.includes(dep)) - .reduce((acc: PluginStatus, key: PluginName) => { - acc[key] = allStatus[key]; - return acc; - }, {}) - ), + map((allStatus) => { + const dependenciesStatus: Record = {}; + directDependencies.forEach((dep) => (dependenciesStatus[dep] = allStatus[dep])); + return dependenciesStatus; + }), distinctUntilChanged() ); } public getDerivedStatus$(plugin: PluginName): Observable { return this.pluginData$.asObservable().pipe( - pluck(plugin, 'derivedStatus'), + map((pluginData) => pluginData[plugin]?.derivedStatus), filter((status: ServiceStatus | undefined): status is ServiceStatus => !!status), distinctUntilChanged() ); } + public stop() { + Object.values(this.reportedStatusSubscriptions).forEach((subscription) => { + subscription.unsubscribe(); + }); + } + private initPluginData(pluginDependencies: ReadonlyMap): PluginData { const pluginData: PluginData = {}; @@ -214,7 +216,7 @@ export class PluginsStatusService { const pluginData = this.pluginData[plugin]; pluginData.derivedStatus = newStatus; - if (!this.pluginReportedStatus[plugin]) { + if (!this.reportedStatusSubscriptions[plugin]) { // this plugin has NOT registered any status Observable. Thus, its status is derived from its dependencies + core this.pluginStatus[plugin] = newStatus; } diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index dc79857bbe051..c3df52ec9f360 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -25,9 +25,8 @@ import type { InternalCoreUsageDataSetup } from '../core_usage_data'; import { config, StatusConfigType } from './status_config'; import { ServiceStatus, CoreStatus, InternalStatusServiceSetup } from './types'; import { getSummaryStatus } from './get_summary_status'; -import { PluginsStatusService } from './plugins_status'; +import { PluginsStatusService } from './cached_plugins_status'; import { getOverallStatusChanges } from './log_overall_status'; -import { CachedPluginsStatusService } from './cached_plugins_status'; interface StatusLogMeta extends LogMeta { kibana: { status: ServiceStatus }; @@ -68,7 +67,7 @@ export class StatusService implements CoreService { }: SetupDeps) { const statusConfig = await this.config$.pipe(take(1)).toPromise(); const core$ = this.setupCoreStatus({ elasticsearch, savedObjects }); - this.pluginsStatus = new CachedPluginsStatusService({ core$, pluginDependencies }); + this.pluginsStatus = new PluginsStatusService({ core$, pluginDependencies }); this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe( // Prevent many emissions at once from dependency status resolution from making this too noisy @@ -175,6 +174,8 @@ export class StatusService implements CoreService { this.subscriptions.forEach((subscription) => { subscription.unsubscribe(); }); + + this.pluginsStatus?.stop(); this.subscriptions = []; } From 3d64fedb8b5b542517869a28763045505e052807 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Thu, 24 Mar 2022 19:41:52 +0100 Subject: [PATCH 6/8] Fix plugin_status UTs --- src/core/server/status/plugins_status.test.ts | 39 ++--- src/core/server/status/plugins_status.ts | 149 +++++++++++++----- src/core/server/status/status_service.test.ts | 32 ++-- src/core/server/status/status_service.ts | 2 +- 4 files changed, 147 insertions(+), 75 deletions(-) diff --git a/src/core/server/status/plugins_status.test.ts b/src/core/server/status/plugins_status.test.ts index d0584eb6d8e74..279af27a8f64f 100644 --- a/src/core/server/status/plugins_status.test.ts +++ b/src/core/server/status/plugins_status.test.ts @@ -10,13 +10,13 @@ import { PluginName } from '../plugins'; import { PluginsStatusService } from './plugins_status'; import { of, Observable, BehaviorSubject, ReplaySubject } from 'rxjs'; import { ServiceStatusLevels, CoreStatus, ServiceStatus } from './types'; -import { first } from 'rxjs/operators'; +import { first, skip } from 'rxjs/operators'; import { ServiceStatusLevelSnapshotSerializer } from './test_utils'; expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer); // FIXME temporarily skipping these tests to asses performance of this solution for https://github.com/elastic/kibana/issues/128061 -describe.skip('PluginStatusService', () => { +describe('PluginStatusService', () => { const coreAllAvailable$: Observable = of({ elasticsearch: { level: ServiceStatusLevels.available, summary: 'elasticsearch avail' }, savedObjects: { level: ServiceStatusLevels.available, summary: 'savedObjects avail' }, @@ -216,7 +216,7 @@ describe.skip('PluginStatusService', () => { service.set('a', of({ level: ServiceStatusLevels.available, summary: 'a status' })); expect(await service.getAll$().pipe(first()).toPromise()).toEqual({ - a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available depsite savedObjects being degraded + a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available despite savedObjects being degraded b: { level: ServiceStatusLevels.degraded, summary: '1 service is degraded: savedObjects', @@ -240,6 +240,7 @@ describe.skip('PluginStatusService', () => { const statusUpdates: Array> = []; const subscription = service .getAll$() + .pipe(skip(1)) // the first emission happens right after core services emit .subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses)); service.set('a', of({ level: ServiceStatusLevels.degraded, summary: 'a degraded' })); @@ -262,6 +263,7 @@ describe.skip('PluginStatusService', () => { const statusUpdates: Array> = []; const subscription = service .getAll$() + .pipe(skip(1)) // the first emission happens right after core services emit .subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses)); const aStatus$ = new BehaviorSubject({ @@ -281,19 +283,21 @@ describe.skip('PluginStatusService', () => { }); it('emits an unavailable status if first emission times out, then continues future emissions', async () => { - jest.useFakeTimers(); - const service = new PluginsStatusService({ - core$: coreAllAvailable$, - pluginDependencies: new Map([ - ['a', []], - ['b', ['a']], - ]), - }); + const service = new PluginsStatusService( + { + core$: coreAllAvailable$, + pluginDependencies: new Map([ + ['a', []], + ['b', ['a']], + ]), + }, + 10 + ); const pluginA$ = new ReplaySubject(1); service.set('a', pluginA$); - const firstEmission = service.getAll$().pipe(first()).toPromise(); - jest.runAllTimers(); + // the first emission happens right after core services emit + const firstEmission = service.getAll$().pipe(skip(1), first()).toPromise(); expect(await firstEmission).toEqual({ a: { level: ServiceStatusLevels.unavailable, summary: 'Status check timed out after 30s' }, @@ -309,16 +313,16 @@ describe.skip('PluginStatusService', () => { pluginA$.next({ level: ServiceStatusLevels.available, summary: 'a available' }); const secondEmission = service.getAll$().pipe(first()).toPromise(); - jest.runAllTimers(); expect(await secondEmission).toEqual({ a: { level: ServiceStatusLevels.available, summary: 'a available' }, b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' }, }); - jest.useRealTimers(); }); }); describe('getDependenciesStatus$', () => { + const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + it('only includes dependencies of specified plugin', async () => { const service = new PluginsStatusService({ core$: coreAllAvailable$, @@ -358,7 +362,7 @@ describe.skip('PluginStatusService', () => { it('debounces plugins custom status registration', async () => { const service = new PluginsStatusService({ - core$: coreAllAvailable$, + core$: coreOneCriticalOneDegraded$, pluginDependencies, }); const available: ServiceStatus = { @@ -376,8 +380,6 @@ describe.skip('PluginStatusService', () => { expect(statusUpdates).toStrictEqual([]); - const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); - // Waiting for the debounce timeout should cut a new update await delay(25); subscription.unsubscribe(); @@ -405,7 +407,6 @@ describe.skip('PluginStatusService', () => { const subscription = service .getDependenciesStatus$('b') .subscribe((status) => statusUpdates.push(status)); - const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); pluginA$.next(degraded); pluginA$.next(available); diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index a9836a6ad09a5..6b9173a8f133b 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -5,17 +5,17 @@ * in compliance with, at your election, the Elastic License 2.0 or the Server * Side Public License, v 1. */ -import { Observable, ReplaySubject, Subject, Subscription } from 'rxjs'; +import { BehaviorSubject, Observable, ReplaySubject, Subscription } from 'rxjs'; import { map, distinctUntilChanged, filter, debounceTime, - bufferTime, timeoutWith, startWith, } from 'rxjs/operators'; import { sortBy } from 'lodash'; +import { isDeepStrictEqual } from 'util'; import { type PluginName } from '../plugins'; import { type ServiceStatus, type CoreStatus, ServiceStatusLevels } from './types'; @@ -36,7 +36,7 @@ export interface Deps { interface PluginData { [name: PluginName]: { name: PluginName; - depth: number; + depth: number; // depth of this plugin in the dependency tree (root plugins will have depth = 1) dependencies: PluginName[]; reverseDependencies: PluginName[]; reportedStatus?: ServiceStatus; @@ -54,33 +54,22 @@ interface ReportedStatusSubscriptions { export class PluginsStatusService { private coreStatus: CoreStatus = { elasticsearch: defaultStatus, savedObjects: defaultStatus }; private pluginData: PluginData; - private rootPlugins: PluginName[]; + private rootPlugins: PluginName[]; // root plugins are those that do not have any dependencies private orderedPluginNames: PluginName[]; private pluginData$ = new ReplaySubject(1); private pluginStatus: PluginStatus = {}; - private pluginStatus$ = new ReplaySubject(1); + private pluginStatus$ = new BehaviorSubject(this.pluginStatus); private reportedStatusSubscriptions: ReportedStatusSubscriptions = {}; - private updatePluginStatuses$ = new Subject(); + private isReportingStatus: Record = {}; private newRegistrationsAllowed = true; + private coreSubscription: Subscription; - constructor(private readonly deps: Deps) { + constructor(private readonly deps: Deps, private readonly statusTimeoutMs = STATUS_TIMEOUT_MS) { this.pluginData = this.initPluginData(deps.pluginDependencies); this.rootPlugins = this.getRootPlugins(); this.orderedPluginNames = this.getOrderedPluginNames(); - this.updatePluginStatuses$ - .asObservable() - .pipe( - bufferTime(25), - filter((plugins) => plugins.length > 0) - ) - .subscribe((plugins) => { - this.updatePluginsStatuses(plugins); - this.pluginData$.next(this.pluginData); - this.pluginStatus$.next(this.pluginStatus); - }); - - this.deps.core$.pipe(debounceTime(25)).subscribe((coreStatus) => { + this.coreSubscription = this.deps.core$.pipe(debounceTime(25)).subscribe((coreStatus) => { this.coreStatus = coreStatus!; const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { allAvailableSummary: `All dependencies are available`, @@ -88,16 +77,22 @@ export class PluginsStatusService { this.rootPlugins.forEach((plugin) => { this.pluginData[plugin].derivedStatus = derivedStatus; - if (!this.reportedStatusSubscriptions[plugin]) { + if (!this.isReportingStatus[plugin]) { // this root plugin has NOT registered any status Observable. Thus, its status is derived from core this.pluginStatus[plugin] = derivedStatus; } - - this.updatePluginStatuses$.next(plugin); }); + + this.updatePluginsStatuses(this.rootPlugins); }); } + /** + * Register a status Observable for a specific plugin + * @param {PluginName} plugin The name of the plugin + * @param {Observable} status$ An external Observable that must be trusted as the source of truth for the status of the plugin + * @throws An error if the status registrations are not allowed + */ public set(plugin: PluginName, status$: Observable) { if (!this.newRegistrationsAllowed) { throw new Error( @@ -106,11 +101,15 @@ export class PluginsStatusService { } // unsubscribe from any previous subscriptions. Ideally plugins should register a status Observable only once + this.isReportingStatus[plugin] = true; this.reportedStatusSubscriptions[plugin]?.unsubscribe(); + // delete any derived statuses calculated before the custom status observable was registered + delete this.pluginStatus[plugin]; + this.reportedStatusSubscriptions[plugin] = status$ // Set a timeout for custom status Observables - .pipe(timeoutWith(STATUS_TIMEOUT_MS, status$.pipe(startWith(defaultStatus)))) + .pipe(timeoutWith(this.statusTimeoutMs, status$.pipe(startWith(defaultStatus)))) .subscribe((status) => { const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; @@ -118,18 +117,22 @@ export class PluginsStatusService { this.pluginStatus[plugin] = status; if (status.level !== previousReportedLevel) { - this.updatePluginStatuses$.next(plugin); + this.updatePluginsStatuses([plugin]); } }); - - // delete any derived statuses calculated before the custom status observable was registered - delete this.pluginStatus[plugin]; } + /** + * Prevent plugins from registering status Observables + */ public blockNewRegistrations() { this.newRegistrationsAllowed = false; } + /** + * Obtain an Observable of the status of all the plugins + * @returns {Observable>} An Observable that will yield the current status of all plugins + */ public getAll$(): Observable> { return this.pluginStatus$.asObservable().pipe( // do not emit until we have a status for all plugins @@ -137,6 +140,11 @@ export class PluginsStatusService { ); } + /** + * Obtain an Observable of the status of the dependencies of the given plugin + * @param {PluginName} plugin the name of the plugin whose dependencies' status must be retreived + * @returns {Observable>} An Observable that will yield the current status of the plugin's dependencies + */ public getDependenciesStatus$(plugin: PluginName): Observable> { const directDependencies = this.pluginData[plugin].dependencies; @@ -146,24 +154,41 @@ export class PluginsStatusService { directDependencies.forEach((dep) => (dependenciesStatus[dep] = allStatus[dep])); return dependenciesStatus; }), - distinctUntilChanged() + debounceTime(10), + distinctUntilChanged>(isDeepStrictEqual) ); } + /** + * Obtain an Observable of the derived status of the given plugin + * @param {PluginName} plugin the name of the plugin whose derived status must be retrieved + * @returns {Observable} An Observable that will yield the derived status of the plugin + */ public getDerivedStatus$(plugin: PluginName): Observable { return this.pluginData$.asObservable().pipe( map((pluginData) => pluginData[plugin]?.derivedStatus), filter((status: ServiceStatus | undefined): status is ServiceStatus => !!status), - distinctUntilChanged() + distinctUntilChanged(isDeepStrictEqual) ); } + /** + * Hook to be called at the stop lifecycle event + */ public stop() { + // Cancel all active subscriptions + this.coreSubscription.unsubscribe(); Object.values(this.reportedStatusSubscriptions).forEach((subscription) => { subscription.unsubscribe(); }); } + /** + * Initialize a convenience data structure + * that maintain up-to-date information about the plugins and their statuses + * @param {ReadonlyMap} pluginDependencies Information about the different plugins and their dependencies + * @returns {PluginData} + */ private initPluginData(pluginDependencies: ReadonlyMap): PluginData { const pluginData: PluginData = {}; @@ -188,12 +213,22 @@ export class PluginsStatusService { return pluginData; } + /** + * Create a list with all the root plugins. + * Root plugins are all those plugins that do not have any dependency. + * @returns {PluginName[]} a list with all the root plugins present in the provided deps + */ private getRootPlugins(): PluginName[] { return Object.keys(this.pluginData).filter( (plugin) => this.pluginData[plugin].dependencies.length === 0 ); } + /** + * Obtain a list of plugins names, ordered by depth. + * @see {calculateDepthRecursive} + * @returns {PluginName[]} a list of plugins, ordered by depth + name + */ private getOrderedPluginNames(): PluginName[] { this.rootPlugins.forEach((plugin) => { this.calculateDepthRecursive(plugin, 1); @@ -202,7 +237,15 @@ export class PluginsStatusService { return sortBy(Object.values(this.pluginData), ['depth', 'name']).map(({ name }) => name); } - private calculateDepthRecursive(plugin: PluginName, depth: number) { + /** + * Calculate the depth of the given plugin, knowing that it's has at least the specified depth + * The depth of a plugin is determined by how many levels of dependencies the plugin has above it. + * We define root plugins as depth = 1, plugins that only depend on root plugins will have depth = 2 + * and so on so forth + * @param {PluginName} plugin the name of the plugin whose depth must be calculated + * @param {number} depth the minimum depth that we know for sure this plugin has + */ + private calculateDepthRecursive(plugin: PluginName, depth: number): void { const pluginData = this.pluginData[plugin]; pluginData.depth = Math.max(pluginData.depth, depth); const newDepth = depth + 1; @@ -211,20 +254,34 @@ export class PluginsStatusService { ); } + /** + * Determine the derived status of the specified plugin and update it on the pluginData structure + * Optionally, if the plugin has not registered a custom status Observable, update its "current" status as well + * @param {PluginName} plugin The name of the plugin to be updated + */ private updatePluginStatus(plugin: PluginName): void { const newStatus = this.determinePluginStatus(plugin); - const pluginData = this.pluginData[plugin]; - pluginData.derivedStatus = newStatus; - if (!this.reportedStatusSubscriptions[plugin]) { + this.pluginData[plugin].derivedStatus = newStatus; + + if (!this.isReportingStatus[plugin]) { // this plugin has NOT registered any status Observable. Thus, its status is derived from its dependencies + core this.pluginStatus[plugin] = newStatus; } } + /** + * Determine the derived statuses of the specified plugins and their dependencies, + * updating them on the pluginData structure + * Optionally, if the plugins have not registered a custom status Observable, update their "current" status as well. + * @param {PluginName[]} plugins The names of the plugins to be updated + */ private updatePluginsStatuses(plugins: PluginName[]): void { const toCheck = new Set(plugins); - for (let i = 0; i < this.orderedPluginNames.length && toCheck.size > 0; ++i) { + + // Note that we are updating the plugins in an ordered fashion + // this way, when updating plugin X (at depth = N), all of its dependencies (at depth < N) have already been updated + for (let i = 0; i < this.orderedPluginNames.length; ++i) { const current = this.orderedPluginNames[i]; if (toCheck.has(current)) { // update the current plugin status @@ -233,16 +290,30 @@ export class PluginsStatusService { this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep)); } } + + this.pluginData$.next(this.pluginData); + this.pluginStatus$.next({ ...this.pluginStatus }); } + /** + * Deterime the current plugin status, taking into account its reported status, its derived status + * and the status of the core services + * @param {PluginName} plugin the name of the plugin whose status must be determined + * @returns {ServiceStatus} The status of the plugin + */ private determinePluginStatus(plugin: PluginName): ServiceStatus { const coreStatus: Array<[PluginName, ServiceStatus]> = Object.entries(this.coreStatus); - const depsStatus: Array<[PluginName, ServiceStatus]> = this.pluginData[plugin].dependencies.map( - (dependency) => [ + const newLocal = this.pluginData[plugin]; + + let depsStatus: Array<[PluginName, ServiceStatus]> = []; + + if (Object.keys(this.isReportingStatus).length) { + // if at least one plugin has registered a status Observable... take into account plugin dependencies + depsStatus = newLocal.dependencies.map((dependency) => [ dependency, this.pluginData[dependency].reportedStatus || this.pluginData[dependency].derivedStatus, - ] - ); + ]); + } const newStatus = getSummaryStatus([...coreStatus, ...depsStatus], { allAvailableSummary: `All dependencies are available`, diff --git a/src/core/server/status/status_service.test.ts b/src/core/server/status/status_service.test.ts index dfd0ff9a7e103..3ed37c23707ef 100644 --- a/src/core/server/status/status_service.test.ts +++ b/src/core/server/status/status_service.test.ts @@ -239,20 +239,20 @@ describe('StatusService', () => { // Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing. elasticsearch$.next(available); - await delay(500); + await delay(200); elasticsearch$.next(available); - await delay(500); + await delay(200); elasticsearch$.next({ level: ServiceStatusLevels.available, summary: `Wow another summary`, }); - await delay(500); + await delay(200); savedObjects$.next(degraded); - await delay(500); + await delay(200); savedObjects$.next(available); - await delay(500); + await delay(200); savedObjects$.next(available); - await delay(500); + await delay(200); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -300,9 +300,9 @@ describe('StatusService', () => { savedObjects$.next(available); savedObjects$.next(degraded); // Waiting for the debounce timeout should cut a new update - await delay(500); + await delay(230); savedObjects$.next(available); - await delay(500); + await delay(230); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -410,20 +410,20 @@ describe('StatusService', () => { // Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing. elasticsearch$.next(available); - await delay(500); + await delay(200); elasticsearch$.next(available); - await delay(500); + await delay(200); elasticsearch$.next({ level: ServiceStatusLevels.available, summary: `Wow another summary`, }); - await delay(500); + await delay(200); savedObjects$.next(degraded); - await delay(500); + await delay(200); savedObjects$.next(available); - await delay(500); + await delay(200); savedObjects$.next(available); - await delay(500); + await delay(200); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -471,9 +471,9 @@ describe('StatusService', () => { savedObjects$.next(available); savedObjects$.next(degraded); // Waiting for the debounce timeout should cut a new update - await delay(500); + await delay(200); savedObjects$.next(available); - await delay(500); + await delay(200); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index c3df52ec9f360..a7e5aff408379 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -71,7 +71,7 @@ export class StatusService implements CoreService { this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe( // Prevent many emissions at once from dependency status resolution from making this too noisy - debounceTime(500), + debounceTime(180), map(([coreStatus, pluginsStatus]) => { const summary = getSummaryStatus([ ...Object.entries(coreStatus), From 1301ded2205c0336634646c9e8aa4eb49b2f9647 Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 25 Mar 2022 09:48:55 +0100 Subject: [PATCH 7/8] Code cleanup + enhancements --- src/core/server/status/plugins_status.test.ts | 12 +- src/core/server/status/plugins_status.ts | 108 ++++++++++-------- src/core/server/status/status_service.test.ts | 32 +++--- src/core/server/status/status_service.ts | 2 +- 4 files changed, 88 insertions(+), 66 deletions(-) diff --git a/src/core/server/status/plugins_status.test.ts b/src/core/server/status/plugins_status.test.ts index 279af27a8f64f..efa2342bb9ca5 100644 --- a/src/core/server/status/plugins_status.test.ts +++ b/src/core/server/status/plugins_status.test.ts @@ -240,7 +240,10 @@ describe('PluginStatusService', () => { const statusUpdates: Array> = []; const subscription = service .getAll$() - .pipe(skip(1)) // the first emission happens right after core services emit + // If we subscribe to the $getAll() Observable BEFORE setting a custom status Observable + // for a given plugin ('a' in this test), then the first emission will happen + // right after core$ services Observable emits + .pipe(skip(1)) .subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses)); service.set('a', of({ level: ServiceStatusLevels.degraded, summary: 'a degraded' })); @@ -263,7 +266,8 @@ describe('PluginStatusService', () => { const statusUpdates: Array> = []; const subscription = service .getAll$() - .pipe(skip(1)) // the first emission happens right after core services emit + // the first emission happens right after core services emit (see explanation above) + .pipe(skip(1)) .subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses)); const aStatus$ = new BehaviorSubject({ @@ -291,12 +295,12 @@ describe('PluginStatusService', () => { ['b', ['a']], ]), }, - 10 + 10 // set a small timeout so that the registered status Observable for 'a' times out quickly ); const pluginA$ = new ReplaySubject(1); service.set('a', pluginA$); - // the first emission happens right after core services emit + // the first emission happens right after core$ services emit const firstEmission = service.getAll$().pipe(skip(1), first()).toPromise(); expect(await firstEmission).toEqual({ diff --git a/src/core/server/status/plugins_status.ts b/src/core/server/status/plugins_status.ts index 6b9173a8f133b..8d042d4cba3f9 100644 --- a/src/core/server/status/plugins_status.ts +++ b/src/core/server/status/plugins_status.ts @@ -64,27 +64,14 @@ export class PluginsStatusService { private newRegistrationsAllowed = true; private coreSubscription: Subscription; - constructor(private readonly deps: Deps, private readonly statusTimeoutMs = STATUS_TIMEOUT_MS) { + constructor(deps: Deps, private readonly statusTimeoutMs: number = STATUS_TIMEOUT_MS) { this.pluginData = this.initPluginData(deps.pluginDependencies); this.rootPlugins = this.getRootPlugins(); this.orderedPluginNames = this.getOrderedPluginNames(); - this.coreSubscription = this.deps.core$.pipe(debounceTime(25)).subscribe((coreStatus) => { - this.coreStatus = coreStatus!; - const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { - allAvailableSummary: `All dependencies are available`, - }); - - this.rootPlugins.forEach((plugin) => { - this.pluginData[plugin].derivedStatus = derivedStatus; - if (!this.isReportingStatus[plugin]) { - // this root plugin has NOT registered any status Observable. Thus, its status is derived from core - this.pluginStatus[plugin] = derivedStatus; - } - }); - - this.updatePluginsStatuses(this.rootPlugins); - }); + this.coreSubscription = deps.core$ + .pipe(debounceTime(10)) + .subscribe((coreStatus: CoreStatus) => this.updateCoreAndPluginStatuses(coreStatus)); } /** @@ -100,26 +87,17 @@ export class PluginsStatusService { ); } - // unsubscribe from any previous subscriptions. Ideally plugins should register a status Observable only once this.isReportingStatus[plugin] = true; + // unsubscribe from any previous subscriptions. Ideally plugins should register a status Observable only once this.reportedStatusSubscriptions[plugin]?.unsubscribe(); - // delete any derived statuses calculated before the custom status observable was registered + // delete any derived statuses calculated before the custom status Observable was registered delete this.pluginStatus[plugin]; this.reportedStatusSubscriptions[plugin] = status$ - // Set a timeout for custom status Observables + // Set a timeout for externally-defined status Observables .pipe(timeoutWith(this.statusTimeoutMs, status$.pipe(startWith(defaultStatus)))) - .subscribe((status) => { - const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; - - this.pluginData[plugin].reportedStatus = status; - this.pluginStatus[plugin] = status; - - if (status.level !== previousReportedLevel) { - this.updatePluginsStatuses([plugin]); - } - }); + .subscribe((status) => this.updatePluginReportedStatus(plugin, status)); } /** @@ -136,7 +114,8 @@ export class PluginsStatusService { public getAll$(): Observable> { return this.pluginStatus$.asObservable().pipe( // do not emit until we have a status for all plugins - filter((all) => Object.keys(all).length === this.orderedPluginNames.length) + filter((all) => Object.keys(all).length === this.orderedPluginNames.length), + distinctUntilChanged>(isDeepStrictEqual) ); } @@ -154,8 +133,7 @@ export class PluginsStatusService { directDependencies.forEach((dep) => (dependenciesStatus[dep] = allStatus[dep])); return dependenciesStatus; }), - debounceTime(10), - distinctUntilChanged>(isDeepStrictEqual) + debounceTime(10) ); } @@ -255,19 +233,25 @@ export class PluginsStatusService { } /** - * Determine the derived status of the specified plugin and update it on the pluginData structure - * Optionally, if the plugin has not registered a custom status Observable, update its "current" status as well - * @param {PluginName} plugin The name of the plugin to be updated + * Updates the core services statuses and plugins' statuses + * according to the latest status reported by core services. + * @param {CoreStatus} coreStatus the latest status of core services */ - private updatePluginStatus(plugin: PluginName): void { - const newStatus = this.determinePluginStatus(plugin); + private updateCoreAndPluginStatuses(coreStatus: CoreStatus): void { + this.coreStatus = coreStatus!; + const derivedStatus = getSummaryStatus(Object.entries(this.coreStatus), { + allAvailableSummary: `All dependencies are available`, + }); - this.pluginData[plugin].derivedStatus = newStatus; + this.rootPlugins.forEach((plugin) => { + this.pluginData[plugin].derivedStatus = derivedStatus; + if (!this.isReportingStatus[plugin]) { + // this root plugin has NOT registered any status Observable. Thus, its status is derived from core + this.pluginStatus[plugin] = derivedStatus; + } + }); - if (!this.isReportingStatus[plugin]) { - // this plugin has NOT registered any status Observable. Thus, its status is derived from its dependencies + core - this.pluginStatus[plugin] = newStatus; - } + this.updatePluginsStatuses(this.rootPlugins); } /** @@ -279,14 +263,16 @@ export class PluginsStatusService { private updatePluginsStatuses(plugins: PluginName[]): void { const toCheck = new Set(plugins); - // Note that we are updating the plugins in an ordered fashion - // this way, when updating plugin X (at depth = N), all of its dependencies (at depth < N) have already been updated + // Note that we are updating the plugins in an ordered fashion. + // This way, when updating plugin X (at depth = N), + // all of its dependencies (at depth < N) have already been updated for (let i = 0; i < this.orderedPluginNames.length; ++i) { const current = this.orderedPluginNames[i]; if (toCheck.has(current)) { // update the current plugin status this.updatePluginStatus(current); // flag all its reverse dependencies to be checked + // TODO flag them only IF the status of this plugin has changed, seems to break some tests this.pluginData[current].reverseDependencies.forEach((revDep) => toCheck.add(revDep)); } } @@ -295,6 +281,22 @@ export class PluginsStatusService { this.pluginStatus$.next({ ...this.pluginStatus }); } + /** + * Determine the derived status of the specified plugin and update it on the pluginData structure + * Optionally, if the plugin has not registered a custom status Observable, update its "current" status as well + * @param {PluginName} plugin The name of the plugin to be updated + */ + private updatePluginStatus(plugin: PluginName): void { + const newStatus = this.determinePluginStatus(plugin); + this.pluginData[plugin].derivedStatus = newStatus; + + if (!this.isReportingStatus[plugin]) { + // this plugin has NOT registered any status Observable. + // Thus, its status is derived from its dependencies + core + this.pluginStatus[plugin] = newStatus; + } + } + /** * Deterime the current plugin status, taking into account its reported status, its derived status * and the status of the core services @@ -321,4 +323,20 @@ export class PluginsStatusService { return newStatus; } + + /** + * Updates the reported status for the given plugin, along with the status of its dependencies tree. + * @param {PluginName} plugin The name of the plugin whose reported status must be updated + * @param {ServiceStatus} reportedStatus The newly reported status for that plugin + */ + private updatePluginReportedStatus(plugin: PluginName, reportedStatus: ServiceStatus): void { + const previousReportedLevel = this.pluginData[plugin].reportedStatus?.level; + + this.pluginData[plugin].reportedStatus = reportedStatus; + this.pluginStatus[plugin] = reportedStatus; + + if (reportedStatus.level !== previousReportedLevel) { + this.updatePluginsStatuses([plugin]); + } + } } diff --git a/src/core/server/status/status_service.test.ts b/src/core/server/status/status_service.test.ts index 3ed37c23707ef..262667fddf26a 100644 --- a/src/core/server/status/status_service.test.ts +++ b/src/core/server/status/status_service.test.ts @@ -239,20 +239,20 @@ describe('StatusService', () => { // Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing. elasticsearch$.next(available); - await delay(200); + await delay(100); elasticsearch$.next(available); - await delay(200); + await delay(100); elasticsearch$.next({ level: ServiceStatusLevels.available, summary: `Wow another summary`, }); - await delay(200); + await delay(100); savedObjects$.next(degraded); - await delay(200); + await delay(100); savedObjects$.next(available); - await delay(200); + await delay(100); savedObjects$.next(available); - await delay(200); + await delay(100); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -300,9 +300,9 @@ describe('StatusService', () => { savedObjects$.next(available); savedObjects$.next(degraded); // Waiting for the debounce timeout should cut a new update - await delay(230); + await delay(100); savedObjects$.next(available); - await delay(230); + await delay(100); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -410,20 +410,20 @@ describe('StatusService', () => { // Wait for timers to ensure that duplicate events are still filtered out regardless of debouncing. elasticsearch$.next(available); - await delay(200); + await delay(100); elasticsearch$.next(available); - await delay(200); + await delay(100); elasticsearch$.next({ level: ServiceStatusLevels.available, summary: `Wow another summary`, }); - await delay(200); + await delay(100); savedObjects$.next(degraded); - await delay(200); + await delay(100); savedObjects$.next(available); - await delay(200); + await delay(100); savedObjects$.next(available); - await delay(200); + await delay(100); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` @@ -471,9 +471,9 @@ describe('StatusService', () => { savedObjects$.next(available); savedObjects$.next(degraded); // Waiting for the debounce timeout should cut a new update - await delay(200); + await delay(100); savedObjects$.next(available); - await delay(200); + await delay(100); subscription.unsubscribe(); expect(statusUpdates).toMatchInlineSnapshot(` diff --git a/src/core/server/status/status_service.ts b/src/core/server/status/status_service.ts index a7e5aff408379..a5b5f0a37397a 100644 --- a/src/core/server/status/status_service.ts +++ b/src/core/server/status/status_service.ts @@ -71,7 +71,7 @@ export class StatusService implements CoreService { this.overall$ = combineLatest([core$, this.pluginsStatus.getAll$()]).pipe( // Prevent many emissions at once from dependency status resolution from making this too noisy - debounceTime(180), + debounceTime(80), map(([coreStatus, pluginsStatus]) => { const summary = getSummaryStatus([ ...Object.entries(coreStatus), From 9db84660dde0bdc430830e3d74d3f3f624f014ae Mon Sep 17 00:00:00 2001 From: Gerard Soldevila Date: Fri, 25 Mar 2022 09:53:37 +0100 Subject: [PATCH 8/8] Remove fixed FIXME --- src/core/server/status/plugins_status.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/core/server/status/plugins_status.test.ts b/src/core/server/status/plugins_status.test.ts index efa2342bb9ca5..c07624826ff83 100644 --- a/src/core/server/status/plugins_status.test.ts +++ b/src/core/server/status/plugins_status.test.ts @@ -15,7 +15,6 @@ import { ServiceStatusLevelSnapshotSerializer } from './test_utils'; expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer); -// FIXME temporarily skipping these tests to asses performance of this solution for https://github.com/elastic/kibana/issues/128061 describe('PluginStatusService', () => { const coreAllAvailable$: Observable = of({ elasticsearch: { level: ServiceStatusLevels.available, summary: 'elasticsearch avail' },