Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/core/public/pulse/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ export interface PulseServiceStart {}

const channelNames = ['default', 'notifications', 'errors'];

const logger = {
...console,
// eslint-disable-next-line no-console
fatal: (...args: any[]) => console.error(...args),
get: () => logger,
};

export class PulseService {
private retriableErrors = 0;
private readonly channels: Map<string, PulseChannel>;
Expand All @@ -45,7 +52,7 @@ export class PulseService {
channelNames.map((id): [string, PulseChannel] => {
const instructions$ = new Subject<PulseInstruction>();
this.instructions.set(id, instructions$);
const channel = new PulseChannel({ id, instructions$ });
const channel = new PulseChannel({ id, instructions$, logger });
return [channel.id, channel];
})
);
Expand Down Expand Up @@ -120,7 +127,7 @@ export class PulseService {

const responseBody: InstructionsResponse = await response.json();

responseBody.channels.forEach(channel => {
responseBody.channels.forEach((channel: PulseChannel) => {
const instructions$ = this.instructions.get(channel.id);
if (!instructions$) {
throw new Error(
Expand Down
28 changes: 22 additions & 6 deletions src/core/server/pulse/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,43 @@
*/

import { Subject } from 'rxjs';
// import { IClusterClient } from '../elasticsearch';
import { PulseCollectorConstructor } from './types';
import { IClusterClient } from '../elasticsearch';
import { SavedObjectsServiceSetup } from '../saved_objects';
import { Logger } from '../logging';

export interface PulseInstruction {
owner: string;
id: string;
value: unknown;
}

interface ChannelConfig {
export interface ChannelConfig {
id: string;
instructions$: Subject<PulseInstruction>;
logger: Logger;
}
export interface ChannelSetupContext {
elasticsearch: IClusterClient;
savedObjects: SavedObjectsServiceSetup;
}

export class PulseChannel {
public readonly getRecords: () => Promise<Record<string, any>>;
export class PulseChannel<Payload = any, Rec = Payload> {
private readonly collector: any;

constructor(private readonly config: ChannelConfig) {
this.collector = require(`${__dirname}/collectors/${this.id}`);
this.getRecords = this.collector.getRecords;
const Collector: PulseCollectorContructor = require(`${__dirname}/collectors/${this.id}`)
.Collector;
this.collector = new Collector(this.config.logger);
}

public async setup(setupContext: ChannelSetupContexxt) {
return this.collector.setup(setupContext);
}

public async getRecords() {
return this.collector.getRecords();
}
public get id() {
return this.config.id;
}
Expand Down
21 changes: 12 additions & 9 deletions src/core/server/pulse/collectors/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@
* under the License.
*/

export async function getRecords() {
return [];
}
// import { IClusterClient } from '../../elasticsearch';

// export async function getRecords(elasticsearch: IClusterClient) {
// const pingResult = await elasticsearch.callAsInternalUser('ping');
import { PulseCollector } from '../types';
export class Collector extends PulseCollector<unknown, { ping_received: boolean }> {
public async putRecord() {}
public async getRecords() {
if (this.elasticsearch) {
const pingResult = await this.elasticsearch.callAsInternalUser('ping');

// return [{ ping_received: pingResult }];
// }
return [{ ping_received: pingResult }];
}
return [];
// throw Error(`Default collector not initialised with an "elasticsearch" client!`);
}
}
52 changes: 46 additions & 6 deletions src/core/server/pulse/collectors/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,56 @@
// be stored as an individual document in the errors channel index
// by the service

import { PulseCollector, CollectorSetupContext } from '../types';

export interface Payload {
errorId: string;
}

const payloads: Payload[] = [];
export class Collector extends PulseCollector<Payload> {
private payloads: Payload[] = [];
private readonly indexName = '.pulse-errors';

export async function putRecord(payload: Payload) {
payloads.push(payload);
}
public async setup(deps: CollectorSetupContext) {
await super.setup(deps);
const exists = await this.elasticsearch!.callAsInternalUser('indices.exists', {
index: this.indexName,
});
if (!exists) {
await this.elasticsearch!.callAsInternalUser('indices.create', {
index: this.indexName,
body: {
settings: {
number_of_shards: 1,
},
mappings: {
properties: {
errorId: {
type: 'keyword',
},
},
},
},
});
}
}
public async putRecord(payload: Payload) {
this.payloads.push(payload);
if (this.elasticsearch) {
await this.elasticsearch.callAsInternalUser('create', {
index: this.indexName,
body: payload,
});
}
}

export async function getRecords() {
return payloads;
public async getRecords() {
if (this.elasticsearch) {
const results = await this.elasticsearch.callAsInternalUser('search', {
index: this.indexName,
});
// TODO: Set results as sent and return them
}
return this.payloads.splice(0, this.payloads.length);
}
}
9 changes: 7 additions & 2 deletions src/core/server/pulse/collectors/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
* under the License.
*/

export async function getRecords() {
return [];
import { PulseCollector } from '../types';

export class Collector extends PulseCollector {
public async putRecord() {}
public async getRecords() {
return [];
}
}
106 changes: 18 additions & 88 deletions src/core/server/pulse/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ import { Subject } from 'rxjs';
import fetch from 'node-fetch';
import { CoreContext } from '../core_context';
import { Logger } from '../logging';
import { ElasticsearchServiceSetup } from '../elasticsearch';
import { ElasticsearchServiceSetup, IClusterClient } from '../elasticsearch';
import { PulseChannel, PulseInstruction } from './channel';
import { sendPulse, Fetcher } from './send_pulse';
import { SavedObjectsServiceSetup } from '../saved_objects';

export interface InternalPulseService {
getChannel: (id: string) => PulseChannel;
}

export interface PulseSetupDeps {
elasticsearch: ElasticsearchServiceSetup;
savedObjects: SavedObjectsServiceSetup;
}

export type PulseServiceSetup = InternalPulseService;
// eslint-disable-next-line @typescript-eslint/no-empty-interface
export interface PulseServiceStart {}

interface ChannelResponse {
id: string;
Expand All @@ -62,16 +62,15 @@ export class PulseService {
private readonly log: Logger;
private readonly channels: Map<string, PulseChannel>;
private readonly instructions$: Map<string, Subject<any>> = new Map();
// private readonly subscriptions: Set<NodeJS.Timer> = new Set();
// private elasticsearch?: IClusterClient;
private elasticsearch?: IClusterClient;

constructor(coreContext: CoreContext) {
this.log = coreContext.logger.get('pulse-service');
this.channels = new Map(
channelNames.map((id): [string, PulseChannel] => {
const instructions$ = new Subject<PulseInstruction>();
this.instructions$.set(id, instructions$);
const channel = new PulseChannel({ id, instructions$ });
const channel = new PulseChannel({ id, instructions$, logger: this.log });
return [channel.id, channel];
})
);
Expand All @@ -80,18 +79,23 @@ export class PulseService {
public async setup(deps: PulseSetupDeps): Promise<InternalPulseService> {
this.log.debug('Setting up pulse service');

this.elasticsearch = deps.elasticsearch.createClient('pulse-service');
this.channels.forEach(channel =>
channel.setup({
elasticsearch: this.elasticsearch!,
savedObjects: deps.savedObjects,
})
);
// poll for instructions every second for this deployment
setInterval(() => {
// eslint-disable-next-line no-console
this.loadInstructions().catch(err => console.error(err.stack));
}, 1000);
this.loadInstructions().catch(err => this.log.error(err.stack));
}, 10000);

this.log.debug('Will attempt first telemetry collection in 5 seconds...');

// eslint-disable-next-line no-console
console.log('Will attempt first telemetry collection in 5 seconds...');
setTimeout(() => {
setInterval(() => {
// eslint-disable-next-line no-console
this.sendTelemetry().catch(err => console.error(err.stack));
this.sendTelemetry().catch(err => this.log.error(err.stack));
}, 5000);
}, 5000);

Expand Down Expand Up @@ -145,8 +149,7 @@ export class PulseService {
private handleRetriableError() {
this.retriableErrors++;
if (this.retriableErrors === 1) {
// eslint-disable-next-line no-console
console.warn(
this.log.warn(
'Kibana is not yet available at http://localhost:5601/api, will continue to check for the next 120 seconds...'
);
} else if (this.retriableErrors > 120) {
Expand All @@ -172,76 +175,3 @@ export class PulseService {
return await sendPulse(this.channels, fetcher);
}
}

// public async start() {
// this.log.info('Starting service');
// if (!this.elasticsearch) {
// throw Error(`The 'PulseService.setup' method needs to be called before the 'start' method`);
// }
// const elasticsearch = this.elasticsearch;

// // poll for instructions every second for this deployment
// const loadInstructionSubcription = setInterval(() => {
// this.loadInstructions().catch(err => this.log.error(err.stack));
// }, 1000);
// this.subscriptions.add(loadInstructionSubcription);

// this.log.debug('Will attempt first telemetry collection in 5 seconds...');
// const sendTelemetrySubcription = setInterval(() => {
// this.sendTelemetry(elasticsearch).catch(err => this.log.error(err.stack));
// }, 5000);
// this.subscriptions.add(sendTelemetrySubcription);
// }

// public async stop() {
// this.subscriptions.forEach(subscription => {
// clearInterval(subscription);
// this.subscriptions.delete(subscription);
// });
// }

// private retriableErrors = 0;

// private async sendTelemetry(elasticsearch: IClusterClient) {
// this.log.debug('Sending telemetry');
// const url = 'http://localhost:5601/api/pulse_poc/intake/123';

// const channels = [];
// for (const channel of this.channels.values()) {
// const records = await channel.getRecords(elasticsearch);
// this.log.debug(`Channel "${channel.id}" returns the records ${JSON.stringify(records)}`);
// channels.push({
// records,
// channel_id: channel.id,
// });
// }

// let response: any;
// try {
// response = await fetch(url, {
// method: 'post',
// headers: {
// 'content-type': 'application/json',
// 'kbn-xsrf': 'true',
// },
// body: JSON.stringify({
// channels,
// }),
// });
// } catch (err) {
// if (!err.message.includes('ECONNREFUSED')) {
// throw err;
// }
// // the instructions polling should handle logging for this case, yay for POCs
// return;
// }
// if (response.status === 503) {
// // the instructions polling should handle logging for this case, yay for POCs
// return;
// }

// if (response.status !== 200) {
// const responseBody = await response.text();
// throw new Error(`${response.status}: ${responseBody}`);
// }
// }
44 changes: 44 additions & 0 deletions src/core/server/pulse/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { IClusterClient } from '../elasticsearch';
import { SavedObjectsServiceSetup, ISavedObjectsRepository } from '../saved_objects';
import { Logger } from '../logging';

export type PulseCollectorConstructor = new (logger: Logger) => PulseCollector;

export interface CollectorSetupContext {
elasticsearch: IClusterClient;
savedObjects: SavedObjectsServiceSetup;
}

export abstract class PulseCollector<Payload = unknown, PulseRecord = Payload> {
protected savedObjects?: ISavedObjectsRepository;
protected elasticsearch?: IClusterClient;

constructor(protected readonly logger: Logger) {}

public abstract async putRecord(payload: Payload): Promise<void>;
public abstract async getRecords(): Promise<PulseRecord[]>;

public async setup(setupContext: CollectorSetupContext) {
this.savedObjects = setupContext.savedObjects.createInternalRepository();
this.elasticsearch = setupContext.elasticsearch;
}
}
Loading