From 0b8d10f213855e5db076a0b3e598d9b8908054e1 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Sat, 7 Dec 2019 15:10:42 -0500 Subject: [PATCH 01/30] Base service with receiving_telemetry check --- x-pack/plugins/pulse_poc/README.md | 61 ++++++++++++++ x-pack/plugins/pulse_poc/kibana.json | 8 ++ .../default/check_receiving_telemetry.ts | 35 ++++++++ x-pack/plugins/pulse_poc/server/index.ts | 11 +++ x-pack/plugins/pulse_poc/server/plugin.ts | 81 +++++++++++++++++++ 5 files changed, 196 insertions(+) create mode 100644 x-pack/plugins/pulse_poc/README.md create mode 100644 x-pack/plugins/pulse_poc/kibana.json create mode 100644 x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts create mode 100644 x-pack/plugins/pulse_poc/server/index.ts create mode 100644 x-pack/plugins/pulse_poc/server/plugin.ts diff --git a/x-pack/plugins/pulse_poc/README.md b/x-pack/plugins/pulse_poc/README.md new file mode 100644 index 0000000000000..28850b41e4340 --- /dev/null +++ b/x-pack/plugins/pulse_poc/README.md @@ -0,0 +1,61 @@ +# Pulse Service POC + +This is the POC service code. This wouldn't actually be in the Kibana project, but it is for the POC. + +## How it works + +The Pulse **client** in Kibana would send telemetry organized by *channel* periodically to the Pulse service's intake API. Independently, the Pulse client polls the instructions API on the Pulse service to receive any instructions that it should act on. + +The Pulse **service** performs *checks* across all of its channels when the client requests it, and it returns any *instructions* that are generated by those checks. Kibana would be orchestrated to react to those instructions. + +## Set up + +You must have the `pulse-poc-raw` index created for the APIs to work properly. +Start up Kibana and run the following requests in the Kibana Dev Tools Console: + +``` +PUT pulse-poc-raw +{ + "settings" : { + "number_of_shards" : 1 + }, + "mappings" : { + "properties" : { + "deployment_id" : { "type" : "keyword" } + } + } +} +POST /pulse-poc-raw/_doc +{ + "deployment_id": "123" +} +``` + +This index is for the raw telemetry payloads received by the service. + +## Rest API + +These APIs are accessible through Kibana for this POC. The actual service would +live at a dedicated elastic.co URL. + +### `POST /api/pulse_poc/intake` + +Used to send channel-based telemetry into the pulse service. + +### `GET /api/pulse_poc/instructions/{deploymentId}` + +Used to retrieve the current set of instructions for the current deployment. The response will include any instructions for the given deployment, organized by the channel they are associated with. + +## Channels + +Any directory inside `pulse_poc/server/channels` will become an available +channel. + +## Checks + +Any TypeScript file beginning with `check_` inside of a channel directory will +be imported as a "check" module. These modules should export `async check()` +which returns `undefined` if the check should not generate any instructions +for this deployment. If the conditions of the check do warrant sending +instructions to this deployment, then `check` should return those instructions +in the form of a plain JS object. diff --git a/x-pack/plugins/pulse_poc/kibana.json b/x-pack/plugins/pulse_poc/kibana.json new file mode 100644 index 0000000000000..119eb99df067f --- /dev/null +++ b/x-pack/plugins/pulse_poc/kibana.json @@ -0,0 +1,8 @@ +{ + "id": "pulse_poc", + "version": "8.0.0", + "kibanaVersion": "kibana", + "requiredPlugins": [], + "server": true, + "ui": false +} diff --git a/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts b/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts new file mode 100644 index 0000000000000..15997e3954851 --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { IScopedClusterClient } from 'src/core/server'; + +export async function check(es: IScopedClusterClient, deploymentId: string) { + const response = await es.callAsInternalUser('search', { + index: 'pulse-poc-raw', + size: 0, + allow_no_indices: true, + ignore_unavailable: true, + body: { + query: { + term: { + deployment_id: { + value: deploymentId, + }, + }, + }, + }, + }); + + if (response.hits.total.value > 0) { + return undefined; + } else { + return { + owner: 'core', + id: 'pulse_telemetry', + value: 'try_again', + }; + } +} diff --git a/x-pack/plugins/pulse_poc/server/index.ts b/x-pack/plugins/pulse_poc/server/index.ts new file mode 100644 index 0000000000000..59b64dff31e0c --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { PluginInitializerContext } from '../../../../src/core/server'; +import { PulsePocPlugin } from './plugin'; + +export const plugin = (initializerContext: PluginInitializerContext) => + new PulsePocPlugin(initializerContext); diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts new file mode 100644 index 0000000000000..4216e879a1085 --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -0,0 +1,81 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { readdirSync } from 'fs'; +import { resolve } from 'path'; +import { schema } from '@kbn/config-schema'; +import { CoreSetup, PluginInitializerContext } from 'src/core/server'; + +export class PulsePocPlugin { + private channels = readdirSync(resolve(__dirname, 'channels')) + .filter((fileName: string) => !fileName.startsWith('.')) + .map((channelName: string) => { + const channelPath = resolve(__dirname, 'channels', channelName); + const checks = readdirSync(channelPath) + .filter((fileName: string) => fileName.startsWith('check_')) + .map((fileName: string) => { + const id = fileName.slice(6, -3); + const checkFilePath = resolve(channelPath, fileName); + const check = require(checkFilePath).check; + return { id, check }; + }); + return { + id: channelName, + checks, + }; + }); + + constructor(private readonly initializerContext: PluginInitializerContext) {} + + public async setup(core: CoreSetup) { + const router = core.http.createRouter(); + + router.post( + { + path: '/api/pulse_poc/intake', + validate: { + params: schema.object({ + channels: schema.arrayOf(schema.object({})), + }), + }, + }, + async (context, request, response) => { + return response.ok(); + } + ); + + router.get( + { + path: '/api/pulse_poc/instructions/{deploymentId}', + validate: { + query: schema.object({ + deploymentId: schema.string(), + }), + }, + }, + async (context, request, response) => { + const { deploymentId } = request.query; + const es = context.core.elasticsearch.adminClient; + + const allChannelCheckResults = this.channels.map(async channel => { + const channelChecks = channel.checks.map(check => check.check(es, deploymentId)); + const checkResults = await Promise.all(channelChecks); + const instructions = checkResults.filter((value: any) => Boolean(value)); + return { + id: channel.id, + instructions, + }; + }); + const channels = await Promise.all(allChannelCheckResults); + return response.ok({ body: { channels } }); + } + ); + } + + public start() {} + + public stop() {} +} From f89626378beafe32e9cee76c1417e6e33782f233 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Sun, 8 Dec 2019 13:36:25 -0500 Subject: [PATCH 02/30] Shell of a pulse client in core --- src/core/server/pulse/index.ts | 37 ++++++++++++++++++++++++++++++++++ src/core/server/server.ts | 7 +++++++ 2 files changed, 44 insertions(+) create mode 100644 src/core/server/pulse/index.ts diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts new file mode 100644 index 0000000000000..d6872755c5154 --- /dev/null +++ b/src/core/server/pulse/index.ts @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { CoreContext } from '../core_context'; +import { Logger } from '../logging'; +import { ElasticsearchServiceSetup } from '../elasticsearch'; + +export interface PulseSetupDeps { + elasticsearch: ElasticsearchServiceSetup; +} + +export class PulseService { + private readonly log: Logger; + + constructor(coreContext: CoreContext) { + this.log = coreContext.logger.get('pulse-service'); + } + public async setup(deps: PulseSetupDeps) { + this.log.debug('Setting up pulse service'); + } +} diff --git a/src/core/server/server.ts b/src/core/server/server.ts index e7166f30caa34..d28bba83ea350 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -22,6 +22,7 @@ import { Type } from '@kbn/config-schema'; import { ConfigService, Env, Config, ConfigPath } from './config'; import { ElasticsearchService } from './elasticsearch'; +import { PulseService } from './pulse'; import { HttpService, InternalHttpServiceSetup } from './http'; import { LegacyService, ensureValidConfiguration } from './legacy'; import { Logger, LoggerFactory } from './logging'; @@ -50,6 +51,7 @@ export class Server { private readonly capabilities: CapabilitiesService; private readonly context: ContextService; private readonly elasticsearch: ElasticsearchService; + private readonly pulse: PulseService; private readonly http: HttpService; private readonly legacy: LegacyService; private readonly log: Logger; @@ -71,6 +73,7 @@ export class Server { this.plugins = new PluginsService(core); this.legacy = new LegacyService(core); this.elasticsearch = new ElasticsearchService(core); + this.pulse = new PulseService(core); this.savedObjects = new SavedObjectsService(core); this.uiSettings = new UiSettingsService(core); this.capabilities = new CapabilitiesService(core); @@ -109,6 +112,10 @@ export class Server { http: httpSetup, }); + await this.pulse.setup({ + elasticsearch: elasticsearchServiceSetup, + }); + const uiSettingsSetup = await this.uiSettings.setup({ http: httpSetup, }); From cac89e51c08502adc30e3b08897270ab85bda91e Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Tue, 10 Dec 2019 09:08:52 -0500 Subject: [PATCH 03/30] Start of readme --- src/core/server/pulse/README.md | 23 +++++++++++++++++++++++ x-pack/plugins/pulse_poc/README.md | 13 +++++++++++++ 2 files changed, 36 insertions(+) create mode 100644 src/core/server/pulse/README.md diff --git a/src/core/server/pulse/README.md b/src/core/server/pulse/README.md new file mode 100644 index 0000000000000..cecf569a90c56 --- /dev/null +++ b/src/core/server/pulse/README.md @@ -0,0 +1,23 @@ +# Pulse client + +This is the pulse client in Kibana core that integrates with the remote Pulse +service. + +## How it works + +Every 5 seconds, we send a request to the remote Pulse service for any +instructions that are appropriate for this deployment. + +```js +let instructions = []; + +setInterval(() => { + instructions = getNewInstructions(); +}, 5000); + +class Pulse { + getMyInstructions(me) { + return instructions.filter(instruction => instruction.owner === me); + } +} +``` diff --git a/x-pack/plugins/pulse_poc/README.md b/x-pack/plugins/pulse_poc/README.md index 28850b41e4340..e0c1eef4f13f1 100644 --- a/x-pack/plugins/pulse_poc/README.md +++ b/x-pack/plugins/pulse_poc/README.md @@ -10,6 +10,19 @@ The Pulse **service** performs *checks* across all of its channels when the clie ## Set up +You must run Elasticsearch with security disabled, for example: + +```sh +yarn es snapshot -E xpack.security.enabled=false +``` + +You must run Kibana on localhost and port 5601 (the defaults) and not be using +the basepath proxy, like so: + +```sh +yarn start --no-base-path +``` + You must have the `pulse-poc-raw` index created for the APIs to work properly. Start up Kibana and run the following requests in the Kibana Dev Tools Console: From 2899489b9f06f84f486c4806dcfbcece84ea41ba Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Wed, 11 Dec 2019 15:25:45 -0500 Subject: [PATCH 04/30] Basic client polling --- src/core/server/pulse/channel.ts | 39 +++++++++++ src/core/server/pulse/index.ts | 80 +++++++++++++++++++++++ x-pack/plugins/pulse_poc/server/plugin.ts | 4 +- 3 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 src/core/server/pulse/channel.ts diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts new file mode 100644 index 0000000000000..30c4377d7826b --- /dev/null +++ b/src/core/server/pulse/channel.ts @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { BehaviorSubject } from 'rxjs'; + +interface ChannelConfig { + id: string; + instructions$: BehaviorSubject; +} + +export class PulseChannel { + private readonly instructionsInternal$ = new BehaviorSubject([]); + + constructor(private readonly config: ChannelConfig) {} + + public get id() { + return this.config.id; + } + + public instructions$() { + return this.instructionsInternal$.asObservable(); + } +} diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index d6872755c5154..dbf0656564cdc 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -17,21 +17,101 @@ * under the License. */ +import { BehaviorSubject } from 'rxjs'; +// @ts-ignore +import fetch from 'node-fetch'; + import { CoreContext } from '../core_context'; import { Logger } from '../logging'; import { ElasticsearchServiceSetup } from '../elasticsearch'; +import { PulseChannel } from './channel'; export interface PulseSetupDeps { elasticsearch: ElasticsearchServiceSetup; } +interface ChannelResponse { + id: string; + instructions: Array>; +} + +interface InstructionsResponse { + channels: ChannelResponse[]; +} + export class PulseService { private readonly log: Logger; + private readonly channels: PulseChannel[]; + private readonly instructions: Map> = new Map(); constructor(coreContext: CoreContext) { this.log = coreContext.logger.get('pulse-service'); + this.channels = ['default'].map(id => { + const instructions$ = new BehaviorSubject([]); + this.instructions.set(id, instructions$); + return new PulseChannel({ id, instructions$ }); + }); } public async setup(deps: PulseSetupDeps) { this.log.debug('Setting up pulse service'); + + // poll for instructions every second for this deployment + setInterval(() => { + // eslint-disable-next-line no-console + this.loadInstructions().catch(err => console.error(err.stack)); + }, 1000); + + return { + channels: this.channels, + }; + } + + private retriableErrors = 0; + private async loadInstructions() { + const url = 'http://localhost:5601/api/pulse_poc/instructions/123'; + let response: any; + try { + response = await fetch(url); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + this.handleRetriableError(); + return; + } + if (response.status === 503) { + this.handleRetriableError(); + return; + } + + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } + + const responseBody: InstructionsResponse = await response.json(); + + responseBody.channels.forEach(channel => { + const instructions$ = this.instructions.get(channel.id); + if (!instructions$) { + throw new Error( + `Channel (${channel.id}) from service has no corresponding channel handler in client` + ); + } + + instructions$.next(channel.instructions); + }); + } + + private handleRetriableError() { + this.retriableErrors++; + if (this.retriableErrors === 1) { + // eslint-disable-next-line no-console + console.warn( + 'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...' + ); + } else if (this.retriableErrors > 120) { + this.retriableErrors = 0; + } } } diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 4216e879a1085..31a8286d1f567 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -51,13 +51,13 @@ export class PulsePocPlugin { { path: '/api/pulse_poc/instructions/{deploymentId}', validate: { - query: schema.object({ + params: schema.object({ deploymentId: schema.string(), }), }, }, async (context, request, response) => { - const { deploymentId } = request.query; + const { deploymentId } = request.params; const es = context.core.elasticsearch.adminClient; const allChannelCheckResults = this.channels.map(async channel => { From a2da5c378728b664f455aa14e41f2c78d5a657a3 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Wed, 11 Dec 2019 15:31:10 -0500 Subject: [PATCH 05/30] fix service readme --- x-pack/plugins/pulse_poc/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/plugins/pulse_poc/README.md b/x-pack/plugins/pulse_poc/README.md index e0c1eef4f13f1..bd692e95dff79 100644 --- a/x-pack/plugins/pulse_poc/README.md +++ b/x-pack/plugins/pulse_poc/README.md @@ -16,8 +16,8 @@ You must run Elasticsearch with security disabled, for example: yarn es snapshot -E xpack.security.enabled=false ``` -You must run Kibana on localhost and port 5601 (the defaults) and not be using -the basepath proxy, like so: +Only after Elasticsearch has finished starting up, you must start Kibana on +localhost and port 5601 (the defaults) and not be using the basepath proxy: ```sh yarn start --no-base-path From 0027f5116212967ea0c04fb2165bd327f381d260 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Wed, 11 Dec 2019 15:31:50 -0500 Subject: [PATCH 06/30] remove incomplete client readme --- src/core/server/pulse/README.md | 23 ----------------------- 1 file changed, 23 deletions(-) delete mode 100644 src/core/server/pulse/README.md diff --git a/src/core/server/pulse/README.md b/src/core/server/pulse/README.md deleted file mode 100644 index cecf569a90c56..0000000000000 --- a/src/core/server/pulse/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# Pulse client - -This is the pulse client in Kibana core that integrates with the remote Pulse -service. - -## How it works - -Every 5 seconds, we send a request to the remote Pulse service for any -instructions that are appropriate for this deployment. - -```js -let instructions = []; - -setInterval(() => { - instructions = getNewInstructions(); -}, 5000); - -class Pulse { - getMyInstructions(me) { - return instructions.filter(instruction => instruction.owner === me); - } -} -``` From 7013658d30a0ea66895687b861840760dd2445a1 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 10:24:43 -0500 Subject: [PATCH 07/30] client: disallow channels mutation --- src/core/server/pulse/index.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index dbf0656564cdc..86315b51ba8c6 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -61,8 +61,14 @@ export class PulseService { this.loadInstructions().catch(err => console.error(err.stack)); }, 1000); + const channelEntries = this.channels.map((channel): [string, PulseChannel] => { + return [channel.id, channel]; + }); + return { - channels: this.channels, + getChannels() { + return new Map(channelEntries); + }, }; } From d4b9f0939e0c0bfaa703d5a3a463c24d35d6dacb Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 10:47:58 -0500 Subject: [PATCH 08/30] create instructions observable from constructor arg --- src/core/server/pulse/channel.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts index 30c4377d7826b..271cbbd5a26b4 100644 --- a/src/core/server/pulse/channel.ts +++ b/src/core/server/pulse/channel.ts @@ -25,8 +25,6 @@ interface ChannelConfig { } export class PulseChannel { - private readonly instructionsInternal$ = new BehaviorSubject([]); - constructor(private readonly config: ChannelConfig) {} public get id() { @@ -34,6 +32,6 @@ export class PulseChannel { } public instructions$() { - return this.instructionsInternal$.asObservable(); + return this.config.instructions$.asObservable(); } } From d2078ac92de341d07a43f7e843b15ec2a0f8100e Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 11:23:24 -0500 Subject: [PATCH 09/30] client: using instructions provided by service --- src/core/server/pulse/channel.ts | 10 +++++++-- src/core/server/pulse/index.ts | 37 +++++++++++++++++--------------- src/core/server/server.ts | 29 +++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts index 271cbbd5a26b4..6250d981fabe0 100644 --- a/src/core/server/pulse/channel.ts +++ b/src/core/server/pulse/channel.ts @@ -17,11 +17,17 @@ * under the License. */ -import { BehaviorSubject } from 'rxjs'; +import { Subject } from 'rxjs'; + +export interface PulseInstruction { + owner: string; + id: string; + value: unknown; +} interface ChannelConfig { id: string; - instructions$: BehaviorSubject; + instructions$: Subject; } export class PulseChannel { diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index 86315b51ba8c6..6537148d1b346 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -17,14 +17,14 @@ * under the License. */ -import { BehaviorSubject } from 'rxjs'; +import { Subject } from 'rxjs'; // @ts-ignore import fetch from 'node-fetch'; import { CoreContext } from '../core_context'; import { Logger } from '../logging'; import { ElasticsearchServiceSetup } from '../elasticsearch'; -import { PulseChannel } from './channel'; +import { PulseChannel, PulseInstruction } from './channel'; export interface PulseSetupDeps { elasticsearch: ElasticsearchServiceSetup; @@ -32,7 +32,7 @@ export interface PulseSetupDeps { interface ChannelResponse { id: string; - instructions: Array>; + instructions: PulseInstruction[]; } interface InstructionsResponse { @@ -41,16 +41,19 @@ interface InstructionsResponse { export class PulseService { private readonly log: Logger; - private readonly channels: PulseChannel[]; - private readonly instructions: Map> = new Map(); + private readonly channels: Map; + private readonly instructions: Map> = new Map(); constructor(coreContext: CoreContext) { this.log = coreContext.logger.get('pulse-service'); - this.channels = ['default'].map(id => { - const instructions$ = new BehaviorSubject([]); - this.instructions.set(id, instructions$); - return new PulseChannel({ id, instructions$ }); - }); + this.channels = new Map( + ['default'].map((id): [string, PulseChannel] => { + const instructions$ = new Subject(); + this.instructions.set(id, instructions$); + const channel = new PulseChannel({ id, instructions$ }); + return [channel.id, channel]; + }) + ); } public async setup(deps: PulseSetupDeps) { this.log.debug('Setting up pulse service'); @@ -61,13 +64,13 @@ export class PulseService { this.loadInstructions().catch(err => console.error(err.stack)); }, 1000); - const channelEntries = this.channels.map((channel): [string, PulseChannel] => { - return [channel.id, channel]; - }); - return { - getChannels() { - return new Map(channelEntries); + getChannel: (id: string) => { + const channel = this.channels.get(id); + if (!channel) { + throw new Error(`Unknown channel: ${id}`); + } + return channel; }, }; } @@ -105,7 +108,7 @@ export class PulseService { ); } - instructions$.next(channel.instructions); + channel.instructions.forEach(instruction => instructions$.next(instruction)); }); } diff --git a/src/core/server/server.ts b/src/core/server/server.ts index d28bba83ea350..08f4c46660d41 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -17,7 +17,7 @@ * under the License. */ import { Observable } from 'rxjs'; -import { take } from 'rxjs/operators'; +import { filter, take } from 'rxjs/operators'; import { Type } from '@kbn/config-schema'; import { ConfigService, Env, Config, ConfigPath } from './config'; @@ -112,10 +112,35 @@ export class Server { http: httpSetup, }); - await this.pulse.setup({ + const pulseSetup = await this.pulse.setup({ elasticsearch: elasticsearchServiceSetup, }); + // example of retrieving instructions for a specific channel + const defaultChannelInstructions$ = pulseSetup.getChannel('default').instructions$(); + + // example of retrieving only instructions that you "own" + // use this to only pay attention to pulse instructions you care about + const coreInstructions$ = defaultChannelInstructions$.pipe( + filter(instruction => instruction.owner === 'core') + ); + + // example of retrieving only instructions of a specific type + // use this to only pay attention to specific instructions + const pulseTelemetryInstructions$ = coreInstructions$.pipe( + filter(instruction => instruction.id === 'pulse_telemetry') + ); + + // example of retrieving only instructions with a specific value + // use this when you want to handle a specific scenario/use case for some type of instruction + const retryTelemetryInstructions$ = pulseTelemetryInstructions$.pipe( + filter(instruction => instruction.value === 'try_again') + ); + + retryTelemetryInstructions$.subscribe(() => { + this.log.info(`Received instructions to retry telemetry collection`); + }); + const uiSettingsSetup = await this.uiSettings.setup({ http: httpSetup, }); From 3fb38ee95545077d723337a8becdd0a7baff398b Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 11:51:07 -0500 Subject: [PATCH 10/30] log pulse service startup --- x-pack/plugins/pulse_poc/server/plugin.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 31a8286d1f567..c6c63e421d367 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -31,6 +31,11 @@ export class PulsePocPlugin { constructor(private readonly initializerContext: PluginInitializerContext) {} public async setup(core: CoreSetup) { + const logger = this.initializerContext.logger.get('pulse-service'); + logger.info( + `Starting up POC pulse service, which wouldn't actually be part of Kibana in reality` + ); + const router = core.http.createRouter(); router.post( From 99b265bd7c16e579ae5b16958c84fc466ae2a30f Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 12:46:59 -0500 Subject: [PATCH 11/30] service: accept channel-based telemetry intake --- x-pack/plugins/pulse_poc/README.md | 25 +-------- .../default/check_receiving_telemetry.ts | 2 +- x-pack/plugins/pulse_poc/server/plugin.ts | 56 ++++++++++++++++++- 3 files changed, 56 insertions(+), 27 deletions(-) diff --git a/x-pack/plugins/pulse_poc/README.md b/x-pack/plugins/pulse_poc/README.md index bd692e95dff79..4abcd861d7223 100644 --- a/x-pack/plugins/pulse_poc/README.md +++ b/x-pack/plugins/pulse_poc/README.md @@ -23,35 +23,12 @@ localhost and port 5601 (the defaults) and not be using the basepath proxy: yarn start --no-base-path ``` -You must have the `pulse-poc-raw` index created for the APIs to work properly. -Start up Kibana and run the following requests in the Kibana Dev Tools Console: - -``` -PUT pulse-poc-raw -{ - "settings" : { - "number_of_shards" : 1 - }, - "mappings" : { - "properties" : { - "deployment_id" : { "type" : "keyword" } - } - } -} -POST /pulse-poc-raw/_doc -{ - "deployment_id": "123" -} -``` - -This index is for the raw telemetry payloads received by the service. - ## Rest API These APIs are accessible through Kibana for this POC. The actual service would live at a dedicated elastic.co URL. -### `POST /api/pulse_poc/intake` +### `POST /api/pulse_poc/intake/{deploymentId}` Used to send channel-based telemetry into the pulse service. diff --git a/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts b/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts index 15997e3954851..93172b0354083 100644 --- a/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts +++ b/x-pack/plugins/pulse_poc/server/channels/default/check_receiving_telemetry.ts @@ -8,7 +8,7 @@ import { IScopedClusterClient } from 'src/core/server'; export async function check(es: IScopedClusterClient, deploymentId: string) { const response = await es.callAsInternalUser('search', { - index: 'pulse-poc-raw', + index: 'pulse-poc-raw*', size: 0, allow_no_indices: true, ignore_unavailable: true, diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index c6c63e421d367..861779642ed91 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -40,14 +40,66 @@ export class PulsePocPlugin { router.post( { - path: '/api/pulse_poc/intake', + path: '/api/pulse_poc/intake/{deploymentId}', validate: { params: schema.object({ - channels: schema.arrayOf(schema.object({})), + deploymentId: schema.string(), + }), + body: schema.object({ + channels: schema.arrayOf( + schema.object({ + channel_id: schema.string({ + validate: value => { + if (!this.channels.some(channel => channel.id === value)) { + return `'${value}' is not a known channel`; + } + }, + }), + records: schema.arrayOf(schema.object({}, { allowUnknowns: true })), + }) + ), }), }, }, async (context, request, response) => { + const { deploymentId } = request.params; + const { channels } = request.body; + const es = context.core.elasticsearch.adminClient; + + for (const channel of channels) { + const index = `pulse-poc-raw-${channel.channel_id}`; + const exists = await es.callAsInternalUser('indices.exists', { index }); + if (!exists) { + const indexBody = { + settings: { + number_of_shards: 1, + }, + mappings: { + properties: { + channel_id: { type: 'keyword' }, + deployment_id: { type: 'keyword' }, + }, + }, + }; + + await es.callAsInternalUser('indices.create', { + index, + body: indexBody, + }); + } + + for (const record of channel.records) { + await es.callAsInternalUser('index', { + index, + body: { + ...record, + channel_id: channel.channel_id, + deployment_id: deploymentId, + }, + }); + } + } + return response.ok(); } ); From 6b1ecd7d24a2d1698455018d197e88ec2028c25f Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 13:19:27 -0500 Subject: [PATCH 12/30] client: hardcoded telemetry collection --- src/core/server/pulse/index.ts | 46 ++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index 6537148d1b346..c072bb7ba0451 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -64,6 +64,15 @@ export class PulseService { this.loadInstructions().catch(err => console.error(err.stack)); }, 1000); + // eslint-disable-next-line no-console + console.log('Will attempt first telemetry collection in 5 seconds...'); + setTimeout(() => { + setInterval(() => { + // eslint-disable-next-line no-console + this.sendTelemetry().catch(err => console.error(err.stack)); + }, 5000); + }, 5000); + return { getChannel: (id: string) => { const channel = this.channels.get(id); @@ -123,4 +132,41 @@ export class PulseService { this.retriableErrors = 0; } } + + private async sendTelemetry() { + const url = 'http://localhost:5601/api/pulse_poc/intake/123'; + let response: any; + try { + response = await fetch(url, { + method: 'post', + headers: { + 'content-type': 'application/json', + 'kbn-xsrf': 'true', + }, + body: JSON.stringify({ + channels: [ + { + channel_id: 'default', + records: [{}], + }, + ], + }), + }); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + // the instructions polling should handle logging for this case, yay for POCs + return; + } + if (response.status === 503) { + // the instructions polling should handle logging for this case, yay for POCs + return; + } + + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } + } } From 451da7c266e186854d19cd316d73e5eb30053270 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Fri, 13 Dec 2019 13:46:51 -0500 Subject: [PATCH 13/30] client: send telemetry --- src/core/server/pulse/channel.ts | 6 +++++- src/core/server/pulse/collectors/default.ts | 22 +++++++++++++++++++++ src/core/server/pulse/index.ts | 17 ++++++++++------ 3 files changed, 38 insertions(+), 7 deletions(-) create mode 100644 src/core/server/pulse/collectors/default.ts diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts index 6250d981fabe0..160effd47c094 100644 --- a/src/core/server/pulse/channel.ts +++ b/src/core/server/pulse/channel.ts @@ -31,7 +31,11 @@ interface ChannelConfig { } export class PulseChannel { - constructor(private readonly config: ChannelConfig) {} + public readonly getRecords: () => Promise>; + + constructor(private readonly config: ChannelConfig) { + this.getRecords = require(`${__dirname}/collectors/${this.id}`).getRecords; + } public get id() { return this.config.id; diff --git a/src/core/server/pulse/collectors/default.ts b/src/core/server/pulse/collectors/default.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/server/pulse/collectors/default.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index c072bb7ba0451..61f0b3d01950a 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -135,6 +135,16 @@ export class PulseService { private async sendTelemetry() { const url = 'http://localhost:5601/api/pulse_poc/intake/123'; + + const channels = []; + for (const channel of this.channels.values()) { + const records = await channel.getRecords(); + channels.push({ + records, + channel_id: channel.id, + }); + } + let response: any; try { response = await fetch(url, { @@ -144,12 +154,7 @@ export class PulseService { 'kbn-xsrf': 'true', }, body: JSON.stringify({ - channels: [ - { - channel_id: 'default', - records: [{}], - }, - ], + channels, }), }); } catch (err) { From 3bd969b8fec833c6fff4bc6f75fd71c2a64c6de6 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Mon, 16 Dec 2019 09:36:27 -0500 Subject: [PATCH 14/30] client: expose pulse to plugins --- src/core/server/internal_types.ts | 2 ++ src/core/server/pulse/index.ts | 6 +++++- src/core/server/server.ts | 1 + 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/core/server/internal_types.ts b/src/core/server/internal_types.ts index 07fd77f83d774..2d2edcb429786 100644 --- a/src/core/server/internal_types.ts +++ b/src/core/server/internal_types.ts @@ -18,6 +18,7 @@ */ import { InternalElasticsearchServiceSetup } from './elasticsearch'; +import { InternalPulseService } from './pulse'; import { InternalHttpServiceSetup } from './http'; import { InternalUiSettingsServiceSetup } from './ui_settings'; import { ContextSetup } from './context'; @@ -33,6 +34,7 @@ export interface InternalCoreSetup { context: ContextSetup; http: InternalHttpServiceSetup; elasticsearch: InternalElasticsearchServiceSetup; + pulse: InternalPulseService; uiSettings: InternalUiSettingsServiceSetup; savedObjects: InternalSavedObjectsServiceSetup; } diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index 61f0b3d01950a..eb5b20210ea38 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -26,6 +26,10 @@ import { Logger } from '../logging'; import { ElasticsearchServiceSetup } from '../elasticsearch'; import { PulseChannel, PulseInstruction } from './channel'; +export interface InternalPulseService { + getChannel: (id: string) => PulseChannel; +} + export interface PulseSetupDeps { elasticsearch: ElasticsearchServiceSetup; } @@ -55,7 +59,7 @@ export class PulseService { }) ); } - public async setup(deps: PulseSetupDeps) { + public async setup(deps: PulseSetupDeps): Promise { this.log.debug('Setting up pulse service'); // poll for instructions every second for this deployment diff --git a/src/core/server/server.ts b/src/core/server/server.ts index 08f4c46660d41..03ca3463054e6 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -154,6 +154,7 @@ export class Server { capabilities: capabilitiesSetup, context: contextServiceSetup, elasticsearch: elasticsearchServiceSetup, + pulse: pulseSetup, http: httpSetup, uiSettings: uiSettingsSetup, savedObjects: savedObjectsSetup, From cb82e2dd2de4d4d1a67f287e3122150c327645c3 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Mon, 16 Dec 2019 09:42:05 -0500 Subject: [PATCH 15/30] client: channel names based on file system --- src/core/server/pulse/index.ts | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index eb5b20210ea38..e726aeac0acf0 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -17,6 +17,8 @@ * under the License. */ +import { readdirSync } from 'fs'; +import { resolve } from 'path'; import { Subject } from 'rxjs'; // @ts-ignore import fetch from 'node-fetch'; @@ -43,6 +45,12 @@ interface InstructionsResponse { channels: ChannelResponse[]; } +const channelNames = readdirSync(resolve(__dirname, 'collectors')) + .filter((fileName: string) => !fileName.startsWith('.')) + .map((fileName: string) => { + return fileName.slice(0, -3); + }); + export class PulseService { private readonly log: Logger; private readonly channels: Map; @@ -51,7 +59,7 @@ export class PulseService { constructor(coreContext: CoreContext) { this.log = coreContext.logger.get('pulse-service'); this.channels = new Map( - ['default'].map((id): [string, PulseChannel] => { + channelNames.map((id): [string, PulseChannel] => { const instructions$ = new Subject(); this.instructions.set(id, instructions$); const channel = new PulseChannel({ id, instructions$ }); @@ -59,6 +67,7 @@ export class PulseService { }) ); } + public async setup(deps: PulseSetupDeps): Promise { this.log.debug('Setting up pulse service'); From 788c749ed1a3517e8067a1a153bfe83b8ae48a14 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Mon, 16 Dec 2019 09:46:46 -0500 Subject: [PATCH 16/30] readme: document how to work on this POC --- PULSE.md | 95 ++++++++++++++++++++++++++++++ x-pack/plugins/pulse_poc/README.md | 35 +---------- 2 files changed, 96 insertions(+), 34 deletions(-) create mode 100644 PULSE.md diff --git a/PULSE.md b/PULSE.md new file mode 100644 index 0000000000000..f04b09ba1f005 --- /dev/null +++ b/PULSE.md @@ -0,0 +1,95 @@ +# Pulse POC + +Pulse is the future of telemetry, and it aims to collect more granular data while +providing a more SaaS-like experience for all installs. + +At the heart of this proof of concept are two components: the *service* and the *client*. + +The *service* represents the remote REST API that would be deployed an elastic.co URL. In +this POC, the service exists as a plugin in Kibana so we can easily develop on it side by +side with the changes to Kibana itself. It's contained in +`[x-pack/plugins/pulse_poc](./x-pack/plugins/pulse_poc)` and is accessible at +`http://localhost:5601/api/pulse_poc`. + +The *client* represents the core service within Kibana that manages interactions with the +remote service. It's contained in [src/core/server/pulse](./src/core/server/pulse). + +The client periodically sends data, organized by *channel* to the service. The client +also periodically retrieves *instructions* from the service, which are also organized by +channel. In turn, the instructions are then used in relevant places in Kibana to extend +that functionality. Instructions are plain objects with an `owner` property, which +roughly indicates which plugin or core is responsible for handling this instruction, an +`id` property, which uniquely identifies a specific type of instruction, and a `value` +which contains content appropriate for that specific instruction. + +When the service receives data, it ingests it into channel-specific indices named with +the format `pulse-poc-raw-`. + +When the service receives a request for instructions, it iterates over all of its channel +definitions and invokes all defined `check` functions, and the return values of that +become the instructions in the response. + +## Running the POC + +You must run Elasticsearch with security disabled, for example: + +```sh +yarn es snapshot -E xpack.security.enabled=false +``` + +**You must wait for Elasticsearch to start up,** after which you must start Kibana on +localhost and port 5601 (the defaults) and not be using the basepath proxy: + +```sh +yarn start --no-base-path +``` + +## Adding a channel + +To add a channel to the service, create a directory for that channel in +`[x-pack/plugins/pulse_poc/server/channels](./x-pack/plugins/pulse_poc/server/channels)`. +An empty directory is technically enough to allow receiving data for that channel, but in +practice you'll want to add at least one `check` function so the channel is providing +value back to the product in some way. + +To add a channel to the client, create a TypeScript file for that channel in +`[src/core/server/pulse/collectors](./src/core/server/pulse/collectors)`. In practice, +this file should export a record collector. See "Sending data" below. + +## Adding instructions + +To add an instruction, you must create a `check` function in the channel of your choice. +To do so, create a TypeScript file under the channel directory of your choice and prefix +its name with `check_`. This file must return an asynchronous function `check`. + +Each `check` function either returns `undefined`, in which case that particular +instruction will not be included in the response, or it will return instruction object +that should be included in the response. + +## Sending data + +To send data from the client, you must use a channel *collector*. Collectors are defined +in `[src/core/server/pulse/collectors](./src/core/server/pulse/collectors)` and must each +export an asynchronous function `getRecords()`, which should return an array of one or +more telemetry records for that channel. Each record will ultimately be stored as an +individual document in that channel's index by the service. + +## Using instructions + +On the client, instructions are exposed as an observable, where each individual +instruction is a new value. Each channel has its own observable, which you retrieve via +`core.pulse.getChannel()`, like so: + +```js +class MyPlugin { + setup(core) { + const instructions$ = core.pulse.getChannel('default').instructions$(); + + instructions$.subscribe(instruction => { + // instruction = { owner: 'my', id: 'foo_instruction', value: { foo: 'bar' } } + }); + } +} +``` + +At that point, how the instruction is used is up to the individual integration. diff --git a/x-pack/plugins/pulse_poc/README.md b/x-pack/plugins/pulse_poc/README.md index 4abcd861d7223..b39825898b59d 100644 --- a/x-pack/plugins/pulse_poc/README.md +++ b/x-pack/plugins/pulse_poc/README.md @@ -2,26 +2,7 @@ This is the POC service code. This wouldn't actually be in the Kibana project, but it is for the POC. -## How it works - -The Pulse **client** in Kibana would send telemetry organized by *channel* periodically to the Pulse service's intake API. Independently, the Pulse client polls the instructions API on the Pulse service to receive any instructions that it should act on. - -The Pulse **service** performs *checks* across all of its channels when the client requests it, and it returns any *instructions* that are generated by those checks. Kibana would be orchestrated to react to those instructions. - -## Set up - -You must run Elasticsearch with security disabled, for example: - -```sh -yarn es snapshot -E xpack.security.enabled=false -``` - -Only after Elasticsearch has finished starting up, you must start Kibana on -localhost and port 5601 (the defaults) and not be using the basepath proxy: - -```sh -yarn start --no-base-path -``` +To understand how this is used, see [Pulse.md](../../../PULSE.md). ## Rest API @@ -35,17 +16,3 @@ Used to send channel-based telemetry into the pulse service. ### `GET /api/pulse_poc/instructions/{deploymentId}` Used to retrieve the current set of instructions for the current deployment. The response will include any instructions for the given deployment, organized by the channel they are associated with. - -## Channels - -Any directory inside `pulse_poc/server/channels` will become an available -channel. - -## Checks - -Any TypeScript file beginning with `check_` inside of a channel directory will -be imported as a "check" module. These modules should export `async check()` -which returns `undefined` if the check should not generate any instructions -for this deployment. If the conditions of the check do warrant sending -instructions to this deployment, then `check` should return those instructions -in the form of a plain JS object. From 362d071d6034642361f456df03d29b9ad186cf97 Mon Sep 17 00:00:00 2001 From: Court Ewing Date: Mon, 16 Dec 2019 09:49:34 -0500 Subject: [PATCH 17/30] readme: fix path links --- PULSE.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/PULSE.md b/PULSE.md index f04b09ba1f005..2da4f55ff7f9a 100644 --- a/PULSE.md +++ b/PULSE.md @@ -8,11 +8,11 @@ At the heart of this proof of concept are two components: the *service* and the The *service* represents the remote REST API that would be deployed an elastic.co URL. In this POC, the service exists as a plugin in Kibana so we can easily develop on it side by side with the changes to Kibana itself. It's contained in -`[x-pack/plugins/pulse_poc](./x-pack/plugins/pulse_poc)` and is accessible at +[`x-pack/plugins/pulse_poc`](./x-pack/plugins/pulse_poc) and is accessible at `http://localhost:5601/api/pulse_poc`. The *client* represents the core service within Kibana that manages interactions with the -remote service. It's contained in [src/core/server/pulse](./src/core/server/pulse). +remote service. It's contained in [`src/core/server/pulse`](./src/core/server/pulse). The client periodically sends data, organized by *channel* to the service. The client also periodically retrieves *instructions* from the service, which are also organized by @@ -47,13 +47,13 @@ yarn start --no-base-path ## Adding a channel To add a channel to the service, create a directory for that channel in -`[x-pack/plugins/pulse_poc/server/channels](./x-pack/plugins/pulse_poc/server/channels)`. +[`x-pack/plugins/pulse_poc/server/channels`](./x-pack/plugins/pulse_poc/server/channels). An empty directory is technically enough to allow receiving data for that channel, but in practice you'll want to add at least one `check` function so the channel is providing value back to the product in some way. To add a channel to the client, create a TypeScript file for that channel in -`[src/core/server/pulse/collectors](./src/core/server/pulse/collectors)`. In practice, +[`src/core/server/pulse/collectors`](./src/core/server/pulse/collectors). In practice, this file should export a record collector. See "Sending data" below. ## Adding instructions @@ -69,7 +69,7 @@ that should be included in the response. ## Sending data To send data from the client, you must use a channel *collector*. Collectors are defined -in `[src/core/server/pulse/collectors](./src/core/server/pulse/collectors)` and must each +in [`src/core/server/pulse/collectors`](./src/core/server/pulse/collectors) and must each export an asynchronous function `getRecords()`, which should return an array of one or more telemetry records for that channel. Each record will ultimately be stored as an individual document in that channel's index by the service. From 63cd7b937c26ac63d59264f51eb1afe69888a3f7 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Tue, 17 Dec 2019 16:40:14 -0700 Subject: [PATCH 18/30] Adds basic framework for the errors channel --- src/core/server/pulse/collectors/errors.ts | 31 ++++++++++++ .../channels/errors/check_receiving_errors.ts | 47 +++++++++++++++++++ x-pack/plugins/pulse_poc/server/plugin.ts | 5 +- 3 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 src/core/server/pulse/collectors/errors.ts create mode 100644 x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts diff --git a/src/core/server/pulse/collectors/errors.ts b/src/core/server/pulse/collectors/errors.ts new file mode 100644 index 0000000000000..0c185b4f55270 --- /dev/null +++ b/src/core/server/pulse/collectors/errors.ts @@ -0,0 +1,31 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// getRecords should return an array of one or more telemetry +// records for the errors channel. Each record will ultimately +// be stored as an individual document in the errors channel index +// by the service +export async function getRecords() { + return [ + { + plugin: 'foo', + type: 'bar', + }, + ]; +} diff --git a/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts new file mode 100644 index 0000000000000..d646f97bc703e --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts @@ -0,0 +1,47 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { IScopedClusterClient } from 'src/core/server'; + +export async function check(es: IScopedClusterClient, deploymentId: string) { + // TODO: modify the search query for full text search + const response = await es.callAsInternalUser('search', { + index: 'pulse-poc-raw-errors', + size: 0, + allow_no_indices: true, + ignore_unavailable: true, + body: { + query: { + term: { + deployment_id: { + value: deploymentId, + }, + }, + }, + }, + }); + + if (response.hits.total.value === 0) { + // we haven't recorded any errors so there aren't instructions to send back + return undefined; + } else { + /* + we have recorded errors and need to send instructions back that will help + plugin owners resolve the errors + we'll need to parse the stack trace and get information from it regarding: + plugin name + error type (fatal, warning etc) + where it was first encountered + etc + TODO: see the logger for more info + */ + return { + owner: 'plugin', + id: 'pulse_errors', + value: 'Houston, we have a problem!', + }; + } +} diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 861779642ed91..522fb9267706f 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -17,7 +17,7 @@ export class PulsePocPlugin { const checks = readdirSync(channelPath) .filter((fileName: string) => fileName.startsWith('check_')) .map((fileName: string) => { - const id = fileName.slice(6, -3); + const id = fileName.slice(6, -3); // removes the "check_" and ".ts" from the filename const checkFilePath = resolve(channelPath, fileName); const check = require(checkFilePath).check; return { id, check }; @@ -35,6 +35,9 @@ export class PulsePocPlugin { logger.info( `Starting up POC pulse service, which wouldn't actually be part of Kibana in reality` ); + // trying to make sure this works + logger.info(`We have ${this.channels.length} telemetry channels registered`); + this.channels.forEach((channelItem, index) => logger.info(`${index} and id ${channelItem.id}`)); const router = core.http.createRouter(); From af42ed35a92e718b8258225ca38bac1e1abb2913 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Tue, 17 Dec 2019 16:48:07 -0700 Subject: [PATCH 19/30] Adds instructions to create index patterns --- PULSE.md | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/PULSE.md b/PULSE.md index 2da4f55ff7f9a..a3ce0c3c2e724 100644 --- a/PULSE.md +++ b/PULSE.md @@ -44,6 +44,62 @@ localhost and port 5601 (the defaults) and not be using the basepath proxy: yarn start --no-base-path ``` +### Add the index patterns +You'll need to add the index patterns for each channel. To make this easy, use the dev tools in Kibana once it's started up. +The default channel uses 'pulse-poc-raw': +```kibana dev tools +# create the pulse-poc-raw index pattern + +PUT pulse-poc-raw +{ + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "properties": { + "deployment_id": {"type": "keyword"} + } + } +} + +# create a test document +POST /pulse-poc-raw/_doc +{ + "deployment_id": "123" +} + +GET pulse-poc-raw/_mapping +# create the pulse-poc-raw-errors index pattern +``` +The errors channel uses the pulse-poc-raw-errors index pattern: +Note: the mapping isn't fully defined yet :-) +``` +# create the pulse-poc-raw-errors index pattern + + +PUT pulse-poc-raw-errors +{ + "settings": { + "number_of_shards": 1 + }, + "mappings": { + "properties": { + "deployment_id": {"type": "keyword"}, + "error": { "type": "text"} + } + } +} + +POST /pulse-poc-raw-errors/_doc +{ + "deployment_id": "123", + "error": "Houston, we have a problem!" +} + +GET pulse-poc-raw-errors +``` + + ## Adding a channel To add a channel to the service, create a directory for that channel in From 4152499ad87941dfc9b1ed8974eecea8eeced7c4 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Tue, 17 Dec 2019 16:53:00 -0700 Subject: [PATCH 20/30] Reverts changes in plugin.ts --- x-pack/plugins/pulse_poc/server/plugin.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 522fb9267706f..861779642ed91 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -17,7 +17,7 @@ export class PulsePocPlugin { const checks = readdirSync(channelPath) .filter((fileName: string) => fileName.startsWith('check_')) .map((fileName: string) => { - const id = fileName.slice(6, -3); // removes the "check_" and ".ts" from the filename + const id = fileName.slice(6, -3); const checkFilePath = resolve(channelPath, fileName); const check = require(checkFilePath).check; return { id, check }; @@ -35,9 +35,6 @@ export class PulsePocPlugin { logger.info( `Starting up POC pulse service, which wouldn't actually be part of Kibana in reality` ); - // trying to make sure this works - logger.info(`We have ${this.channels.length} telemetry channels registered`); - this.channels.forEach((channelItem, index) => logger.info(`${index} and id ${channelItem.id}`)); const router = core.http.createRouter(); From b8eaaa07c162fc0ff89797d1e0be143028ee25be Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 20 Dec 2019 10:50:15 -0700 Subject: [PATCH 21/30] Removes content added to the README --- PULSE.md | 59 -------------------------------------------------------- 1 file changed, 59 deletions(-) diff --git a/PULSE.md b/PULSE.md index ec35508cd7241..2da4f55ff7f9a 100644 --- a/PULSE.md +++ b/PULSE.md @@ -44,65 +44,6 @@ localhost and port 5601 (the defaults) and not be using the basepath proxy: yarn start --no-base-path ``` -<<<<<<< HEAD -### Add the index patterns -You'll need to add the index patterns for each channel. To make this easy, use the dev tools in Kibana once it's started up. -The default channel uses 'pulse-poc-raw': -```kibana dev tools -# create the pulse-poc-raw index pattern - -PUT pulse-poc-raw -{ - "settings": { - "number_of_shards": 1 - }, - "mappings": { - "properties": { - "deployment_id": {"type": "keyword"} - } - } -} - -# create a test document -POST /pulse-poc-raw/_doc -{ - "deployment_id": "123" -} - -GET pulse-poc-raw/_mapping -# create the pulse-poc-raw-errors index pattern -``` -The errors channel uses the pulse-poc-raw-errors index pattern: -Note: the mapping isn't fully defined yet :-) -``` -# create the pulse-poc-raw-errors index pattern - - -PUT pulse-poc-raw-errors -{ - "settings": { - "number_of_shards": 1 - }, - "mappings": { - "properties": { - "deployment_id": {"type": "keyword"}, - "error": { "type": "text"} - } - } -} - -POST /pulse-poc-raw-errors/_doc -{ - "deployment_id": "123", - "error": "Houston, we have a problem!" -} - -GET pulse-poc-raw-errors -``` - - -======= ->>>>>>> pulse_poc ## Adding a channel To add a channel to the service, create a directory for that channel in From b17b9d684cea24d229af422f919b04f2a1becddc Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 20 Dec 2019 10:53:20 -0700 Subject: [PATCH 22/30] Removes dummy return content from errors channel getRecords --- src/core/server/pulse/collectors/errors.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/core/server/pulse/collectors/errors.ts b/src/core/server/pulse/collectors/errors.ts index 0c185b4f55270..c7e360dc7506c 100644 --- a/src/core/server/pulse/collectors/errors.ts +++ b/src/core/server/pulse/collectors/errors.ts @@ -22,10 +22,5 @@ // be stored as an individual document in the errors channel index // by the service export async function getRecords() { - return [ - { - plugin: 'foo', - type: 'bar', - }, - ]; + return [{}]; } From 40e6a41a82e9f8dfb8ac4ccd295cec889e9d77de Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 16:44:49 -0700 Subject: [PATCH 23/30] Done with adding changes from UI Notifications PR in x-pack/plugins/pulse_poc --- .../channels/errors/check_receiving_errors.ts | 25 ++++++++++++++----- .../notifications/check_notifications.ts | 13 ++++++++++ x-pack/plugins/pulse_poc/server/plugin.ts | 6 +++-- 3 files changed, 36 insertions(+), 8 deletions(-) create mode 100644 x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts diff --git a/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts index d646f97bc703e..f7d98e949ceb7 100644 --- a/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts +++ b/x-pack/plugins/pulse_poc/server/channels/errors/check_receiving_errors.ts @@ -10,7 +10,7 @@ export async function check(es: IScopedClusterClient, deploymentId: string) { // TODO: modify the search query for full text search const response = await es.callAsInternalUser('search', { index: 'pulse-poc-raw-errors', - size: 0, + size: 100, allow_no_indices: true, ignore_unavailable: true, body: { @@ -38,10 +38,23 @@ export async function check(es: IScopedClusterClient, deploymentId: string) { etc TODO: see the logger for more info */ - return { - owner: 'plugin', - id: 'pulse_errors', - value: 'Houston, we have a problem!', - }; + return [ + { + owner: 'core', + id: 'pulse_error', + value: { + error_id: '1', + fix_version: '7.7.0', + }, + }, + { + owner: 'core', + id: 'pulse_error', + value: { + error_id: '2', + fix_version: null, + }, + }, + ]; } } diff --git a/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts b/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts new file mode 100644 index 0000000000000..d5514124eef08 --- /dev/null +++ b/x-pack/plugins/pulse_poc/server/channels/notifications/check_notifications.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export function check() { + return { + owner: 'ml_plugin', + id: 'pulse_telemetry', + value: {}, + }; +} diff --git a/x-pack/plugins/pulse_poc/server/plugin.ts b/x-pack/plugins/pulse_poc/server/plugin.ts index 861779642ed91..8c3d5274d917c 100644 --- a/x-pack/plugins/pulse_poc/server/plugin.ts +++ b/x-pack/plugins/pulse_poc/server/plugin.ts @@ -116,9 +116,11 @@ export class PulsePocPlugin { async (context, request, response) => { const { deploymentId } = request.params; const es = context.core.elasticsearch.adminClient; - const allChannelCheckResults = this.channels.map(async channel => { - const channelChecks = channel.checks.map(check => check.check(es, deploymentId)); + // const indexName = `pulse-poc-raw-${channel.id}`; + const channelChecks = channel.checks.map(check => + check.check(es, deploymentId) + ); const checkResults = await Promise.all(channelChecks); const instructions = checkResults.filter((value: any) => Boolean(value)); return { From de1135065e717db0466d984133e0332ba7a9fab8 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 16:48:23 -0700 Subject: [PATCH 24/30] Adds notifications collector to client --- .../server/pulse/collectors/notifications.ts | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 src/core/server/pulse/collectors/notifications.ts diff --git a/src/core/server/pulse/collectors/notifications.ts b/src/core/server/pulse/collectors/notifications.ts new file mode 100644 index 0000000000000..551dd94fba2fa --- /dev/null +++ b/src/core/server/pulse/collectors/notifications.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return []; +} From 7833324fb19fbf07de5f9d90343f36073200de87 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 17:13:50 -0700 Subject: [PATCH 25/30] Updates changes from UI Notifications in errors collectors --- src/core/server/pulse/collectors/errors.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/core/server/pulse/collectors/errors.ts b/src/core/server/pulse/collectors/errors.ts index c7e360dc7506c..d4bc1442e005b 100644 --- a/src/core/server/pulse/collectors/errors.ts +++ b/src/core/server/pulse/collectors/errors.ts @@ -21,6 +21,17 @@ // records for the errors channel. Each record will ultimately // be stored as an individual document in the errors channel index // by the service + +export interface Payload { + errorId: string; +} + +const payloads: Payload[] = []; + +export async function putRecord(payload: Payload) { + payloads.push(payload); +} + export async function getRecords() { - return [{}]; + return payloads; } From bb031d64d8a9c06752ce526cf73ff3d333bbb691 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 17:21:51 -0700 Subject: [PATCH 26/30] Implements changes from UI notifications default collector and channel.ts --- src/core/server/pulse/channel.ts | 16 ++++++++++++---- src/core/server/pulse/collectors/default.ts | 13 ++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/core/server/pulse/channel.ts b/src/core/server/pulse/channel.ts index b9e2061afba9f..a37fba7792f62 100644 --- a/src/core/server/pulse/channel.ts +++ b/src/core/server/pulse/channel.ts @@ -18,7 +18,7 @@ */ import { Subject } from 'rxjs'; -import { IClusterClient } from '../elasticsearch'; +// import { IClusterClient } from '../elasticsearch'; export interface PulseInstruction { owner: string; @@ -32,16 +32,24 @@ interface ChannelConfig { } export class PulseChannel { - public readonly getRecords: (elasticsearch: IClusterClient) => Promise>; - + public readonly getRecords: () => Promise>; + private readonly collector: any; constructor(private readonly config: ChannelConfig) { - this.getRecords = require(`${__dirname}/collectors/${this.id}`).getRecords; + this.collector = require(`${__dirname}/collectors/${this.id}`); + this.getRecords = this.collector.getRecords; } public get id() { return this.config.id; } + public sendPulse(payload: T) { + if (!this.collector.putRecord) { + throw Error(`this.collector.putRecords not implemented for ${this.id}.`); + } + this.collector.putRecord(payload); + } + public instructions$() { return this.config.instructions$.asObservable(); } diff --git a/src/core/server/pulse/collectors/default.ts b/src/core/server/pulse/collectors/default.ts index e692558630d32..8032f118cb8e3 100644 --- a/src/core/server/pulse/collectors/default.ts +++ b/src/core/server/pulse/collectors/default.ts @@ -17,10 +17,13 @@ * under the License. */ -import { IClusterClient } from '../../elasticsearch'; +export async function getRecords() { + return []; +} +// import { IClusterClient } from '../../elasticsearch'; -export async function getRecords(elasticsearch: IClusterClient) { - const pingResult = await elasticsearch.callAsInternalUser('ping'); +// export async function getRecords(elasticsearch: IClusterClient) { +// const pingResult = await elasticsearch.callAsInternalUser('ping'); - return [{ ping_received: pingResult }]; -} +// return [{ ping_received: pingResult }]; +// } From 36778b9ed3c28a2309d90893ac599428df013cba Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 17:54:34 -0700 Subject: [PATCH 27/30] Implements changes to src/core/server/pulse/index.ts from UI Notifications and comments out this.pulse.start() and this.pulse.stop() in src/core/server/server.ts adds send_pulse --- src/core/server/pulse/index.ts | 178 +++++++++++++++++----------- src/core/server/pulse/send_pulse.ts | 67 +++++++++++ src/core/server/server.ts | 4 +- 3 files changed, 177 insertions(+), 72 deletions(-) create mode 100644 src/core/server/pulse/send_pulse.ts diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index b99275e3f73e0..7f93d0cc8c59c 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -19,14 +19,15 @@ import { readdirSync } from 'fs'; import { resolve, parse } from 'path'; + import { Subject } from 'rxjs'; // @ts-ignore import fetch from 'node-fetch'; - import { CoreContext } from '../core_context'; import { Logger } from '../logging'; -import { ElasticsearchServiceSetup, IClusterClient } from '../elasticsearch'; +import { ElasticsearchServiceSetup } from '../elasticsearch'; import { PulseChannel, PulseInstruction } from './channel'; +import { sendPulse, Fetcher } from './send_pulse'; export interface InternalPulseService { getChannel: (id: string) => PulseChannel; @@ -36,6 +37,9 @@ export interface PulseSetupDeps { elasticsearch: ElasticsearchServiceSetup; } +export type PulseServiceSetup = InternalPulseService; +export interface PulseServiceStart {} + interface ChannelResponse { id: string; instructions: PulseInstruction[]; @@ -53,18 +57,19 @@ const channelNames = readdirSync(resolve(__dirname, 'collectors')) }); export class PulseService { + private retriableErrors = 0; private readonly log: Logger; private readonly channels: Map; - private readonly instructions: Map> = new Map(); - private readonly subscriptions: Set = new Set(); - private elasticsearch?: IClusterClient; + private readonly instructions$: Map> = new Map(); + // private readonly subscriptions: Set = new Set(); + // private elasticsearch?: IClusterClient; constructor(coreContext: CoreContext) { this.log = coreContext.logger.get('pulse-service'); this.channels = new Map( channelNames.map((id): [string, PulseChannel] => { const instructions$ = new Subject(); - this.instructions.set(id, instructions$); + this.instructions$.set(id, instructions$); const channel = new PulseChannel({ id, instructions$ }); return [channel.id, channel]; }) @@ -72,9 +77,22 @@ export class PulseService { } public async setup(deps: PulseSetupDeps): Promise { - this.log.info('Setting up service'); + this.log.debug('Setting up pulse service'); - this.elasticsearch = deps.elasticsearch.createClient('pulse-service'); + // poll for instructions every second for this deployment + setInterval(() => { + // eslint-disable-next-line no-console + this.loadInstructions().catch(err => console.error(err.stack)); + }, 1000); + + // eslint-disable-next-line no-console + console.log('Will attempt first telemetry collection in 5 seconds...'); + setTimeout(() => { + setInterval(() => { + // eslint-disable-next-line no-console + this.sendTelemetry().catch(err => console.error(err.stack)); + }, 5000); + }, 5000); return { getChannel: (id: string) => { @@ -87,34 +105,6 @@ export class PulseService { }; } - public async start() { - this.log.info('Starting service'); - if (!this.elasticsearch) { - throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`); - } - const elasticsearch = this.elasticsearch; - - // poll for instructions every second for this deployment - const loadInstructionSubcription = setInterval(() => { - this.loadInstructions().catch(err => this.log.error(err.stack)); - }, 1000); - this.subscriptions.add(loadInstructionSubcription); - - this.log.debug('Will attempt first telemetry collection in 5 seconds...'); - const sendTelemetrySubcription = setInterval(() => { - this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack)); - }, 5000); - this.subscriptions.add(sendTelemetrySubcription); - } - - public async stop() { - this.subscriptions.forEach(subscription => { - clearInterval(subscription); - this.subscriptions.delete(subscription); - }); - } - - private retriableErrors = 0; private async loadInstructions() { const url = 'http://localhost:5601/api/pulse_poc/instructions/123'; let response: any; @@ -140,7 +130,7 @@ export class PulseService { const responseBody: InstructionsResponse = await response.json(); responseBody.channels.forEach(channel => { - const instructions$ = this.instructions.get(channel.id); + const instructions$ = this.instructions$.get(channel.id); if (!instructions$) { throw new Error( `Channel (${channel.id}) from service has no corresponding channel handler in client` @@ -154,7 +144,8 @@ export class PulseService { private handleRetriableError() { this.retriableErrors++; if (this.retriableErrors === 1) { - this.log.warn( + // eslint-disable-next-line no-console + console.warn( 'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...' ); } else if (this.retriableErrors > 120) { @@ -162,24 +153,11 @@ export class PulseService { } } - private async sendTelemetry(elasticsearch: IClusterClient) { - this.log.debug('Sending telemetry'); - const url = 'http://localhost:5601/api/pulse_poc/intake/123'; - - const channels = []; - for (const channel of this.channels.values()) { - const records = await channel.getRecords(elasticsearch); - this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`); - channels.push({ - records, - channel_id: channel.id, - }); - } - - let response: any; - try { - response = await fetch(url, { + private async sendTelemetry() { + const fetcher: Fetcher = async (url, channels) => { + return await fetch(url, { method: 'post', + headers: { 'content-type': 'application/json', 'kbn-xsrf': 'true', @@ -188,21 +166,81 @@ export class PulseService { channels, }), }); - } catch (err) { - if (!err.message.includes('ECONNREFUSED')) { - throw err; - } - // the instructions polling should handle logging for this case, yay for POCs - return; - } - if (response.status === 503) { - // the instructions polling should handle logging for this case, yay for POCs - return; - } + }; - if (response.status !== 200) { - const responseBody = await response.text(); - throw new Error(`${response.status}: ${responseBody}`); - } + return await sendPulse(this.channels, fetcher); } } + +// public async start() { +// this.log.info('Starting service'); +// if (!this.elasticsearch) { +// throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`); +// } +// const elasticsearch = this.elasticsearch; + +// // poll for instructions every second for this deployment +// const loadInstructionSubcription = setInterval(() => { +// this.loadInstructions().catch(err => this.log.error(err.stack)); +// }, 1000); +// this.subscriptions.add(loadInstructionSubcription); + +// this.log.debug('Will attempt first telemetry collection in 5 seconds...'); +// const sendTelemetrySubcription = setInterval(() => { +// this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack)); +// }, 5000); +// this.subscriptions.add(sendTelemetrySubcription); +// } + +// public async stop() { +// this.subscriptions.forEach(subscription => { +// clearInterval(subscription); +// this.subscriptions.delete(subscription); +// }); +// } + +// private retriableErrors = 0; + +// private async sendTelemetry(elasticsearch: IClusterClient) { +// this.log.debug('Sending telemetry'); +// const url = 'http://localhost:5601/api/pulse_poc/intake/123'; + +// const channels = []; +// for (const channel of this.channels.values()) { +// const records = await channel.getRecords(elasticsearch); +// this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`); +// channels.push({ +// records, +// channel_id: channel.id, +// }); +// } + +// let response: any; +// try { +// response = await fetch(url, { +// method: 'post', +// headers: { +// 'content-type': 'application/json', +// 'kbn-xsrf': 'true', +// }, +// body: JSON.stringify({ +// channels, +// }), +// }); +// } catch (err) { +// if (!err.message.includes('ECONNREFUSED')) { +// throw err; +// } +// // the instructions polling should handle logging for this case, yay for POCs +// return; +// } +// if (response.status === 503) { +// // the instructions polling should handle logging for this case, yay for POCs +// return; +// } + +// if (response.status !== 200) { +// const responseBody = await response.text(); +// throw new Error(`${response.status}: ${responseBody}`); +// } +// } diff --git a/src/core/server/pulse/send_pulse.ts b/src/core/server/pulse/send_pulse.ts new file mode 100644 index 0000000000000..2b58ef13d903d --- /dev/null +++ b/src/core/server/pulse/send_pulse.ts @@ -0,0 +1,67 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { PulseChannel } from './channel'; + +export const CLUSTER_UUID = '123'; +export const BASE_URL = 'http://localhost:5601/api/pulse_poc'; +export interface ChannelsToSend { + records: any; + channel_id: string; +} + +export type Fetcher = ( + url: string, + channelsToSend: ChannelsToSend[] +) => Promise; + +export async function sendPulse( + channels: Map, + fetcher: Fetcher +) { + const url = `${BASE_URL}/intake/${CLUSTER_UUID}`; + + const channelsToSend = []; + for (const channel of channels.values()) { + const records = await channel.getRecords(); + channelsToSend.push({ + records, + channel_id: channel.id, + }); + } + + let response: any; + try { + response = await fetcher(url, channelsToSend); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + // the instructions polling should handle logging for this case, yay for POCs + return; + } + if (response.status === 503) { + // the instructions polling should handle logging for this case, yay for POCs + return; + } + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } +} diff --git a/src/core/server/server.ts b/src/core/server/server.ts index ba9f65bfeb3d6..39c3d6d505ef2 100644 --- a/src/core/server/server.ts +++ b/src/core/server/server.ts @@ -212,7 +212,7 @@ export class Server { await this.http.start(); // Starting it after http because it relies on the the HTTP service - await this.pulse.start(); + // await this.pulse.start(); return coreStart; } @@ -220,7 +220,7 @@ export class Server { public async stop() { this.log.debug('stopping server'); - await this.pulse.stop(); + // await this.pulse.stop(); await this.legacy.stop(); await this.plugins.stop(); await this.savedObjects.stop(); From f90851bdc77c59d415b6ea4e9e9d991965aed109 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 17:59:11 -0700 Subject: [PATCH 28/30] Implements minor changes in server.ts from UI Notifications --- src/core/server/pulse/index.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/core/server/pulse/index.ts b/src/core/server/pulse/index.ts index 7f93d0cc8c59c..6599f7739a259 100644 --- a/src/core/server/pulse/index.ts +++ b/src/core/server/pulse/index.ts @@ -38,6 +38,7 @@ export interface PulseSetupDeps { } export type PulseServiceSetup = InternalPulseService; +// eslint-disable-next-line @typescript-eslint/no-empty-interface export interface PulseServiceStart {} interface ChannelResponse { From a0acb4a08cfbf9b305fc4cf7d1a9b98172cc7c22 Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 18:09:23 -0700 Subject: [PATCH 29/30] Implements more changes --- src/core/server/index.ts | 3 +++ src/core/server/legacy/legacy_service.ts | 1 + src/core/server/plugins/plugin_context.ts | 1 + src/plugins/newsfeed/public/plugin.tsx | 11 ++++++++++- 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/src/core/server/index.ts b/src/core/server/index.ts index 060265120b865..debf2abc0e465 100644 --- a/src/core/server/index.ts +++ b/src/core/server/index.ts @@ -47,6 +47,7 @@ import { IUiSettingsClient, UiSettingsServiceSetup, UiSettingsServiceStart } fro import { SavedObjectsClientContract } from './saved_objects/types'; import { SavedObjectsServiceSetup, SavedObjectsServiceStart } from './saved_objects'; import { CapabilitiesSetup, CapabilitiesStart } from './capabilities'; +import { PulseServiceSetup } from './pulse'; import { UuidServiceSetup } from './uuid'; export { bootstrap } from './bootstrap'; @@ -274,6 +275,7 @@ export interface CoreSetup { /** {@link UiSettingsServiceSetup} */ uiSettings: UiSettingsServiceSetup; /** {@link UuidServiceSetup} */ + pulse: PulseServiceSetup; uuid: UuidServiceSetup; } @@ -298,5 +300,6 @@ export { PluginsServiceSetup, PluginsServiceStart, PluginOpaqueId, + PulseServiceSetup, UuidServiceSetup, }; diff --git a/src/core/server/legacy/legacy_service.ts b/src/core/server/legacy/legacy_service.ts index 2e8a467eff995..c5e8f8dc16a80 100644 --- a/src/core/server/legacy/legacy_service.ts +++ b/src/core/server/legacy/legacy_service.ts @@ -281,6 +281,7 @@ export class LegacyService implements CoreService { dataClient$: setupDeps.core.elasticsearch.dataClient$, createClient: setupDeps.core.elasticsearch.createClient, }, + pulse: setupDeps.core.pulse, http: { createCookieSessionStorageFactory: setupDeps.core.http.createCookieSessionStorageFactory, registerRouteHandlerContext: setupDeps.core.http.registerRouteHandlerContext.bind( diff --git a/src/core/server/plugins/plugin_context.ts b/src/core/server/plugins/plugin_context.ts index 6e9a7967e9eca..8128113b427a1 100644 --- a/src/core/server/plugins/plugin_context.ts +++ b/src/core/server/plugins/plugin_context.ts @@ -137,6 +137,7 @@ export function createPluginSetupContext( plugin: PluginWrapper ): CoreSetup { return { + pulse: deps.pulse, capabilities: { registerProvider: deps.capabilities.registerProvider, registerSwitcher: deps.capabilities.registerSwitcher, diff --git a/src/plugins/newsfeed/public/plugin.tsx b/src/plugins/newsfeed/public/plugin.tsx index 5ea5e5b324717..e43b7e59c2e99 100644 --- a/src/plugins/newsfeed/public/plugin.tsx +++ b/src/plugins/newsfeed/public/plugin.tsx @@ -38,7 +38,16 @@ export class NewsfeedPublicPlugin implements Plugin { this.kibanaVersion = initializerContext.env.packageInfo.version; } - public setup(core: CoreSetup): Setup {} + public setup(core: CoreSetup): Setup { + const instructions$ = core.pulse.getChannel('errors').instructions$(); + core.pulse.getChannel('errors').sendPulse({ + errorId: 'new_error', + }); + + instructions$.subscribe(instruction => { + // console.log('instruction::', instruction); + }); + } public start(core: CoreStart): Start { const api$ = this.fetchNewsfeed(core); From f030b596c3a10218a670b102b9d86fa731d833ed Mon Sep 17 00:00:00 2001 From: Christiane Heiligers Date: Fri, 10 Jan 2020 18:23:44 -0700 Subject: [PATCH 30/30] Implements changes in src/core/public from UI Notifications. Kibana now starts and we get docs from pulse-poc-raw-errors/_search --- src/core/public/core_system.ts | 7 + src/core/public/index.ts | 5 + src/core/public/plugins/plugin_context.ts | 2 + src/core/public/pulse/channel.ts | 21 +++ src/core/public/pulse/collectors/default.ts | 22 +++ src/core/public/pulse/collectors/errors.ts | 22 +++ .../public/pulse/collectors/notifications.ts | 22 +++ src/core/public/pulse/index.ts | 153 ++++++++++++++++++ 8 files changed, 254 insertions(+) create mode 100644 src/core/public/pulse/channel.ts create mode 100644 src/core/public/pulse/collectors/default.ts create mode 100644 src/core/public/pulse/collectors/errors.ts create mode 100644 src/core/public/pulse/collectors/notifications.ts create mode 100644 src/core/public/pulse/index.ts diff --git a/src/core/public/core_system.ts b/src/core/public/core_system.ts index 2a9dca96062dc..63855adf39f09 100644 --- a/src/core/public/core_system.ts +++ b/src/core/public/core_system.ts @@ -26,6 +26,7 @@ import { ChromeService } from './chrome'; import { FatalErrorsService, FatalErrorsSetup } from './fatal_errors'; import { HttpService } from './http'; import { I18nService } from './i18n'; +import { PulseService } from './pulse'; import { InjectedMetadataParams, InjectedMetadataService, @@ -100,6 +101,7 @@ export class CoreSystem { private readonly rendering: RenderingService; private readonly context: ContextService; private readonly integrations: IntegrationsService; + private readonly pulse: PulseService; private readonly rootDomElement: HTMLElement; private readonly coreContext: CoreContext; @@ -137,6 +139,7 @@ export class CoreSystem { this.rendering = new RenderingService(); this.application = new ApplicationService(); this.integrations = new IntegrationsService(); + this.pulse = new PulseService(); this.coreContext = { coreId: Symbol('core'), env: injectedMetadata.env }; @@ -162,6 +165,7 @@ export class CoreSystem { const http = this.http.setup({ injectedMetadata, fatalErrors: this.fatalErrorsSetup }); const uiSettings = this.uiSettings.setup({ http, injectedMetadata }); const notifications = this.notifications.setup({ uiSettings }); + const pulse = await this.pulse.setup(); const pluginDependencies = this.plugins.getOpaqueIds(); const context = this.context.setup({ @@ -184,6 +188,7 @@ export class CoreSystem { injectedMetadata, notifications, uiSettings, + pulse, }; // Services that do not expose contracts at setup @@ -216,6 +221,7 @@ export class CoreSystem { const i18n = await this.i18n.start(); const application = await this.application.start({ http, injectedMetadata }); await this.integrations.start({ uiSettings }); + const pulse = await this.pulse.start(); const coreUiTargetDomElement = document.createElement('div'); coreUiTargetDomElement.id = 'kibana-body'; @@ -271,6 +277,7 @@ export class CoreSystem { notifications, overlays, uiSettings, + pulse, }; const plugins = await this.plugins.start(core); diff --git a/src/core/public/index.ts b/src/core/public/index.ts index 7488f9b973b71..01226825bbd2e 100644 --- a/src/core/public/index.ts +++ b/src/core/public/index.ts @@ -66,6 +66,7 @@ import { UiSettingsState, IUiSettingsClient } from './ui_settings'; import { ApplicationSetup, Capabilities, ApplicationStart } from './application'; import { DocLinksStart } from './doc_links'; import { SavedObjectsStart } from './saved_objects'; +import { PulseServiceSetup, PulseServiceStart } from './pulse'; export { PackageInfo, EnvironmentMode } from '../server/types'; import { IContextContainer, @@ -177,6 +178,7 @@ export interface CoreSetup { notifications: NotificationsSetup; /** {@link IUiSettingsClient} */ uiSettings: IUiSettingsClient; + pulse: PulseServiceSetup; /** * exposed temporarily until https://github.com/elastic/kibana/issues/41990 done * use *only* to retrieve config values. There is no way to set injected values @@ -219,6 +221,7 @@ export interface CoreStart { i18n: I18nStart; /** {@link NotificationsStart} */ notifications: NotificationsStart; + pulse: PulseServiceStart; /** {@link OverlayStart} */ overlays: OverlayStart; /** {@link IUiSettingsClient} */ @@ -306,4 +309,6 @@ export { PluginOpaqueId, IUiSettingsClient, UiSettingsState, + PulseServiceSetup, + PulseServiceStart, }; diff --git a/src/core/public/plugins/plugin_context.ts b/src/core/public/plugins/plugin_context.ts index 848f46605d4de..de0fe4fd60af9 100644 --- a/src/core/public/plugins/plugin_context.ts +++ b/src/core/public/plugins/plugin_context.ts @@ -104,6 +104,7 @@ export function createPluginSetupContext< http: deps.http, notifications: deps.notifications, uiSettings: deps.uiSettings, + pulse: deps.pulse, injectedMetadata: { getInjectedVar: deps.injectedMetadata.getInjectedVar, }, @@ -147,6 +148,7 @@ export function createPluginStartContext< overlays: deps.overlays, uiSettings: deps.uiSettings, savedObjects: deps.savedObjects, + pulse: deps.pulse, injectedMetadata: { getInjectedVar: deps.injectedMetadata.getInjectedVar, }, diff --git a/src/core/public/pulse/channel.ts b/src/core/public/pulse/channel.ts new file mode 100644 index 0000000000000..77b4a6d2d6416 --- /dev/null +++ b/src/core/public/pulse/channel.ts @@ -0,0 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +export { PulseInstruction, PulseChannel } from '../../server/pulse/channel'; diff --git a/src/core/public/pulse/collectors/default.ts b/src/core/public/pulse/collectors/default.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/default.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/collectors/errors.ts b/src/core/public/pulse/collectors/errors.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/errors.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/collectors/notifications.ts b/src/core/public/pulse/collectors/notifications.ts new file mode 100644 index 0000000000000..55b0e1b955304 --- /dev/null +++ b/src/core/public/pulse/collectors/notifications.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export async function getRecords() { + return [{}]; +} diff --git a/src/core/public/pulse/index.ts b/src/core/public/pulse/index.ts new file mode 100644 index 0000000000000..e236bf14da4b5 --- /dev/null +++ b/src/core/public/pulse/index.ts @@ -0,0 +1,153 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Subject } from 'rxjs'; + +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { InstructionsResponse } from '../../server/pulse'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { PulseChannel, PulseInstruction } from './channel'; +// eslint-disable-next-line @kbn/eslint/no-restricted-paths +import { Fetcher, sendPulse } from '../../server/pulse/send_pulse'; + +export interface PulseServiceSetup { + getChannel: (id: string) => PulseChannel; +} + +// eslint-disable-next-line @typescript-eslint/no-empty-interface +export interface PulseServiceStart {} + +const channelNames = ['default', 'notifications', 'errors']; + +export class PulseService { + private retriableErrors = 0; + private readonly channels: Map; + private readonly instructions: Map> = new Map(); + + constructor() { + this.channels = new Map( + channelNames.map((id): [string, PulseChannel] => { + const instructions$ = new Subject(); + this.instructions.set(id, instructions$); + const channel = new PulseChannel({ id, instructions$ }); + return [channel.id, channel]; + }) + ); + } + + public async setup(): Promise { + // poll for instructions every second for this deployment + setInterval(() => { + // eslint-disable-next-line no-console + this.loadInstructions().catch(err => console.error(err.stack)); + }, 10000); + + // eslint-disable-next-line no-console + console.log('Will attempt first telemetry collection in 5 seconds...'); + setTimeout(() => { + setInterval(() => { + // eslint-disable-next-line no-console + this.sendTelemetry().catch(err => console.error(err.stack)); + }, 5000); + }, 5000); + + return { + getChannel: (id: string) => { + const channel = this.channels.get(id); + if (!channel) { + throw new Error(`Unknown channel: ${id}`); + } + return channel; + }, + }; + } + + private async sendTelemetry() { + const fetcher: Fetcher = async (url, channels) => { + return await fetch(url, { + method: 'post', + + headers: { + 'content-type': 'application/json', + 'kbn-xsrf': 'true', + }, + body: JSON.stringify({ + channels, + }), + }); + }; + + return await sendPulse(this.channels, fetcher); + } + + private async loadInstructions() { + const url = 'http://localhost:5601/api/pulse_poc/instructions/123'; + let response: any; + try { + response = await fetch(url); + } catch (err) { + if (!err.message.includes('ECONNREFUSED')) { + throw err; + } + this.handleRetriableError(); + return; + } + if (response.status === 503) { + this.handleRetriableError(); + return; + } + + if (response.status !== 200) { + const responseBody = await response.text(); + throw new Error(`${response.status}: ${responseBody}`); + } + + const responseBody: InstructionsResponse = await response.json(); + + responseBody.channels.forEach(channel => { + const instructions$ = this.instructions.get(channel.id); + if (!instructions$) { + throw new Error( + `Channel (${channel.id}) from service has no corresponding channel handler in client` + ); + } + + channel.instructions.forEach(instruction => instructions$.next(instruction)); + }); + } + + private handleRetriableError() { + this.retriableErrors++; + if (this.retriableErrors === 1) { + // eslint-disable-next-line no-console + console.warn( + 'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...' + ); + } else if (this.retriableErrors > 120) { + this.retriableErrors = 0; + } + } + + async start(): Promise { + return {}; + } + public stop() { + // nothing to do here currently + } +}