Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/core/server/status/cached_plugins_status.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

import { Observable } from 'rxjs';

import type { PluginName } from '../plugins';
import type { ServiceStatus } from './types';

import type { Deps } from './plugins_status';
import { PluginsStatusService as BasePluginsStatusService } from './plugins_status';

export class PluginsStatusService extends BasePluginsStatusService {
private all$?: Observable<Record<PluginName, ServiceStatus>>;
private dependenciesStatuses$: Record<PluginName, Observable<Record<PluginName, ServiceStatus>>>;
private derivedStatuses$: Record<PluginName, Observable<ServiceStatus>>;

constructor(deps: Deps) {
super(deps);
this.dependenciesStatuses$ = {};
this.derivedStatuses$ = {};
}

public getAll$(): Observable<Record<PluginName, ServiceStatus>> {
if (!this.all$) {
this.all$ = super.getAll$();
}

return this.all$;
}

public getDependenciesStatus$(plugin: PluginName): Observable<Record<PluginName, ServiceStatus>> {
if (!this.dependenciesStatuses$[plugin]) {
this.dependenciesStatuses$[plugin] = super.getDependenciesStatus$(plugin);
}

return this.dependenciesStatuses$[plugin];
}

public getDerivedStatus$(plugin: PluginName): Observable<ServiceStatus> {
if (!this.derivedStatuses$[plugin]) {
this.derivedStatuses$[plugin] = super.getDerivedStatus$(plugin);
}

return this.derivedStatuses$[plugin];
}
}
63 changes: 47 additions & 16 deletions src/core/server/status/plugins_status.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { PluginName } from '../plugins';
import { PluginsStatusService } from './plugins_status';
import { of, Observable, BehaviorSubject, ReplaySubject } from 'rxjs';
import { ServiceStatusLevels, CoreStatus, ServiceStatus } from './types';
import { first } from 'rxjs/operators';
import { first, skip } from 'rxjs/operators';
import { ServiceStatusLevelSnapshotSerializer } from './test_utils';

expect.addSnapshotSerializer(ServiceStatusLevelSnapshotSerializer);
Expand Down Expand Up @@ -215,7 +215,7 @@ describe('PluginStatusService', () => {
service.set('a', of({ level: ServiceStatusLevels.available, summary: 'a status' }));

expect(await service.getAll$().pipe(first()).toPromise()).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available depsite savedObjects being degraded
a: { level: ServiceStatusLevels.available, summary: 'a status' }, // a is available despite savedObjects being degraded
b: {
level: ServiceStatusLevels.degraded,
summary: '1 service is degraded: savedObjects',
Expand All @@ -239,6 +239,10 @@ describe('PluginStatusService', () => {
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
// If we subscribe to the $getAll() Observable BEFORE setting a custom status Observable
// for a given plugin ('a' in this test), then the first emission will happen
// right after core$ services Observable emits
.pipe(skip(1))
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));

service.set('a', of({ level: ServiceStatusLevels.degraded, summary: 'a degraded' }));
Expand All @@ -261,6 +265,8 @@ describe('PluginStatusService', () => {
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
// the first emission happens right after core services emit (see explanation above)
.pipe(skip(1))
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));

const aStatus$ = new BehaviorSubject<ServiceStatus>({
Expand All @@ -279,20 +285,48 @@ describe('PluginStatusService', () => {
]);
});

it('emits an unavailable status if first emission times out, then continues future emissions', async () => {
jest.useFakeTimers();
it('updates when a plugin status observable emits with the same level but a different summary', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
pluginDependencies: new Map([
['a', []],
['b', ['a']],
]),
pluginDependencies: new Map([['a', []]]),
});
const statusUpdates: Array<Record<PluginName, ServiceStatus>> = [];
const subscription = service
.getAll$()
// the first emission happens right after core services emit (see explanation above)
.pipe(skip(1))
.subscribe((pluginStatuses) => statusUpdates.push(pluginStatuses));

const aStatus$ = new BehaviorSubject<ServiceStatus>({
level: ServiceStatusLevels.available,
summary: 'summary initial',
});
service.set('a', aStatus$);
aStatus$.next({ level: ServiceStatusLevels.available, summary: 'summary updated' });
subscription.unsubscribe();

expect(statusUpdates).toEqual([
{ a: { level: ServiceStatusLevels.available, summary: 'summary initial' } },
{ a: { level: ServiceStatusLevels.available, summary: 'summary updated' } },
]);
});

it('emits an unavailable status if first emission times out, then continues future emissions', async () => {
const service = new PluginsStatusService(
{
core$: coreAllAvailable$,
pluginDependencies: new Map([
['a', []],
['b', ['a']],
]),
},
10 // set a small timeout so that the registered status Observable for 'a' times out quickly
);

const pluginA$ = new ReplaySubject<ServiceStatus>(1);
service.set('a', pluginA$);
const firstEmission = service.getAll$().pipe(first()).toPromise();
jest.runAllTimers();
// the first emission happens right after core$ services emit
const firstEmission = service.getAll$().pipe(skip(1), first()).toPromise();

expect(await firstEmission).toEqual({
a: { level: ServiceStatusLevels.unavailable, summary: 'Status check timed out after 30s' },
Expand All @@ -308,16 +342,16 @@ describe('PluginStatusService', () => {

pluginA$.next({ level: ServiceStatusLevels.available, summary: 'a available' });
const secondEmission = service.getAll$().pipe(first()).toPromise();
jest.runAllTimers();
expect(await secondEmission).toEqual({
a: { level: ServiceStatusLevels.available, summary: 'a available' },
b: { level: ServiceStatusLevels.available, summary: 'All dependencies are available' },
});
jest.useRealTimers();
});
});

describe('getDependenciesStatus$', () => {
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

it('only includes dependencies of specified plugin', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
Expand Down Expand Up @@ -357,7 +391,7 @@ describe('PluginStatusService', () => {

it('debounces plugins custom status registration', async () => {
const service = new PluginsStatusService({
core$: coreAllAvailable$,
core$: coreOneCriticalOneDegraded$,
pluginDependencies,
});
const available: ServiceStatus = {
Expand All @@ -375,8 +409,6 @@ describe('PluginStatusService', () => {

expect(statusUpdates).toStrictEqual([]);

const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

// Waiting for the debounce timeout should cut a new update
await delay(25);
subscription.unsubscribe();
Expand Down Expand Up @@ -404,7 +436,6 @@ describe('PluginStatusService', () => {
const subscription = service
.getDependenciesStatus$('b')
.subscribe((status) => statusUpdates.push(status));
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

pluginA$.next(degraded);
pluginA$.next(available);
Expand Down
Loading