From fa20df70b013c2f118b45b7098af457a23627a5d Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Tue, 3 Nov 2020 17:32:56 -0300 Subject: [PATCH 1/5] Add metrics to services --- ee/server/broker.ts | 6 +++++- server/sdk/types/IBroker.ts | 26 ++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/ee/server/broker.ts b/ee/server/broker.ts index 3b6aec4e6ef4d..c987e5c9bccdc 100644 --- a/ee/server/broker.ts +++ b/ee/server/broker.ts @@ -3,7 +3,7 @@ import EJSON from 'ejson'; import { asyncLocalStorage, License } from '../../server/sdk'; import { api } from '../../server/sdk/api'; -import { IBroker, IBrokerNode } from '../../server/sdk/types/IBroker'; +import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker'; import { ServiceClass } from '../../server/sdk/types/ServiceClass'; import { EventSignatures } from '../../server/sdk/lib/Events'; import { LocalBroker } from '../../server/sdk/lib/LocalBroker'; @@ -45,11 +45,15 @@ class NetworkBroker implements IBroker { // list of allowed services to run - has precedence over `internalOnly` private allowedList = new Set(SERVICES_ALLOWED?.split(',').map((i) => i.trim()).filter((i) => i)); + metrics: IServiceMetrics; + constructor(broker: ServiceBroker) { this.broker = broker; api.setBroker(this); + this.metrics = broker.metrics; + this.started = this.broker.start(); this.allowed = License.hasLicense('scalability'); diff --git a/server/sdk/types/IBroker.ts b/server/sdk/types/IBroker.ts index 4f4d21f7c724c..529c977e2ab2d 100644 --- a/server/sdk/types/IBroker.ts +++ b/server/sdk/types/IBroker.ts @@ -20,7 +20,33 @@ export interface IBrokerNode { // offlineSince: null } +export type BaseMetricOptions = { + type: string; + name: string; + description?: string; + labelNames?: Array; + unit?: string; + aggregator?: string; +} + +export interface IServiceMetrics { + register(opts: BaseMetricOptions): void; + + hasMetric(name: string): boolean; + + increment(name: string, labels?: Record, value?: number, timestamp?: number): void; + decrement(name: string, labels?: Record, value?: number, timestamp?: number): void; + set(name: string, value: any | null, labels?: Record, timestamp?: number): void; + observe(name: string, value: number, labels?: Record, timestamp?: number): void; + + reset(name: string, labels?: Record, timestamp?: number): void; + resetAll(name: string, timestamp?: number): void; + + timer(name: string, labels?: Record, timestamp?: number): () => number; +} + export interface IBroker { + metrics: IServiceMetrics; destroyService(service: ServiceClass): void; createService(service: ServiceClass): void; call(method: string, data: any): Promise; From b4654c1b46f2f0820a205a0c5733e6e22f6c957b Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Tue, 3 Nov 2020 17:33:03 -0300 Subject: [PATCH 2/5] Add ddp-streamer metrics --- .../services/ddp-streamer/DDPStreamer.ts | 58 +++++++++++++------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/ee/server/services/ddp-streamer/DDPStreamer.ts b/ee/server/services/ddp-streamer/DDPStreamer.ts index bd0952b9223e6..37f3327ca0962 100644 --- a/ee/server/services/ddp-streamer/DDPStreamer.ts +++ b/ee/server/services/ddp-streamer/DDPStreamer.ts @@ -7,10 +7,11 @@ import WebSocket from 'ws'; import { Client } from './Client'; // import { STREAMER_EVENTS, STREAM_NAMES } from './constants'; import { ServiceClass } from '../../../../server/sdk/types/ServiceClass'; -import { events } from './configureServer'; +import { events, server } from './configureServer'; import notifications from './streams/index'; import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module'; import { ListenersModule } from '../../../../server/modules/listeners/listeners.module'; +import { DDP_EVENTS } from './constants'; const { PORT: port = 4000, @@ -81,24 +82,6 @@ wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket')); // }, // }; -// broker.createService({ -// settings: { -// port: PROMETHEUS_PORT, -// metrics: { -// streamer_users_connected: { -// type: 'Gauge', -// labelNames: ['nodeID'], -// help: 'Users connecteds by streamer', -// }, -// streamer_users_logged: { -// type: 'Gauge', -// labelNames: ['nodeID'], -// help: 'Users logged by streamer', -// }, -// }, -// }, -// mixins: PROMETHEUS_PORT !== 'false' ? [PromService] : [], - export class DDPStreamer extends ServiceClass { protected name = 'streamer'; @@ -142,6 +125,43 @@ export class DDPStreamer extends ServiceClass { }); }); } + + async created(): Promise { + if (!this.context) { + return; + } + + const { broker, nodeID } = this.context; + + broker.metrics.register({ + name: 'users.connected', + type: 'gauge', + labelNames: ['nodeID'], + description: 'Users connected by streamer', + }); + + broker.metrics.register({ + name: 'users.logged', + type: 'gauge', + labelNames: ['nodeID'], + description: 'Users logged by streamer', + }); + + server.on(DDP_EVENTS.CONNECTED, () => { + broker.metrics.increment('users.connected', { nodeID }, 1); + }); + + server.on(DDP_EVENTS.LOGGED, () => { + broker.metrics.increment('users.logged', { nodeID }, 1); + }); + + server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => { + broker.metrics.decrement('users.connected', { nodeID }, 1); + if (userId) { + broker.metrics.decrement('users.logged', { nodeID }, 1); + } + }); + } } // broker.start(); From 51c381596b9e87ea9744b6c9349f7060a24faf00 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Tue, 3 Nov 2020 17:33:56 -0300 Subject: [PATCH 3/5] Temp LocalBroker metrics --- server/sdk/lib/LocalBroker.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/sdk/lib/LocalBroker.ts b/server/sdk/lib/LocalBroker.ts index 636e81e51f24b..72df8452cc2c3 100644 --- a/server/sdk/lib/LocalBroker.ts +++ b/server/sdk/lib/LocalBroker.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; -import { IBroker, IBrokerNode } from '../types/IBroker'; +import { IBroker, IBrokerNode, IServiceMetrics } from '../types/IBroker'; import { ServiceClass } from '../types/ServiceClass'; import { asyncLocalStorage } from '..'; import { EventSignatures } from './Events'; @@ -10,6 +10,8 @@ export class LocalBroker implements IBroker { private events = new EventEmitter(); + metrics: IServiceMetrics; + async call(method: string, data: any): Promise { const result = await asyncLocalStorage.run({ id: 'ctx.id', From ac62d7898a4dda878d5ad0cd5d7828fe790f379e Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Tue, 3 Nov 2020 17:40:23 -0300 Subject: [PATCH 4/5] Use underscore on metrics names --- ee/server/services/ddp-streamer/DDPStreamer.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/ee/server/services/ddp-streamer/DDPStreamer.ts b/ee/server/services/ddp-streamer/DDPStreamer.ts index 37f3327ca0962..b3550307f9936 100644 --- a/ee/server/services/ddp-streamer/DDPStreamer.ts +++ b/ee/server/services/ddp-streamer/DDPStreamer.ts @@ -134,31 +134,31 @@ export class DDPStreamer extends ServiceClass { const { broker, nodeID } = this.context; broker.metrics.register({ - name: 'users.connected', + name: 'users_connected', type: 'gauge', labelNames: ['nodeID'], description: 'Users connected by streamer', }); broker.metrics.register({ - name: 'users.logged', + name: 'users_logged', type: 'gauge', labelNames: ['nodeID'], description: 'Users logged by streamer', }); server.on(DDP_EVENTS.CONNECTED, () => { - broker.metrics.increment('users.connected', { nodeID }, 1); + broker.metrics.increment('users_connected', { nodeID }, 1); }); server.on(DDP_EVENTS.LOGGED, () => { - broker.metrics.increment('users.logged', { nodeID }, 1); + broker.metrics.increment('users_logged', { nodeID }, 1); }); server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => { - broker.metrics.decrement('users.connected', { nodeID }, 1); + broker.metrics.decrement('users_connected', { nodeID }, 1); if (userId) { - broker.metrics.decrement('users.logged', { nodeID }, 1); + broker.metrics.decrement('users_logged', { nodeID }, 1); } }); } From 7038b8d0e84d8913c66b966e5fbbccf87760b0c5 Mon Sep 17 00:00:00 2001 From: Diego Sampaio Date: Tue, 3 Nov 2020 18:43:48 -0300 Subject: [PATCH 5/5] Make metrics optional --- .../services/ddp-streamer/DDPStreamer.ts | 20 +++++++++++++------ server/sdk/lib/LocalBroker.ts | 4 +--- server/sdk/types/IBroker.ts | 2 +- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/ee/server/services/ddp-streamer/DDPStreamer.ts b/ee/server/services/ddp-streamer/DDPStreamer.ts index b3550307f9936..3eacaee44a1fc 100644 --- a/ee/server/services/ddp-streamer/DDPStreamer.ts +++ b/ee/server/services/ddp-streamer/DDPStreamer.ts @@ -132,15 +132,23 @@ export class DDPStreamer extends ServiceClass { } const { broker, nodeID } = this.context; + if (!broker) { + return; + } + + const { metrics } = broker; + if (!metrics) { + return; + } - broker.metrics.register({ + metrics.register({ name: 'users_connected', type: 'gauge', labelNames: ['nodeID'], description: 'Users connected by streamer', }); - broker.metrics.register({ + metrics.register({ name: 'users_logged', type: 'gauge', labelNames: ['nodeID'], @@ -148,17 +156,17 @@ export class DDPStreamer extends ServiceClass { }); server.on(DDP_EVENTS.CONNECTED, () => { - broker.metrics.increment('users_connected', { nodeID }, 1); + metrics.increment('users_connected', { nodeID }, 1); }); server.on(DDP_EVENTS.LOGGED, () => { - broker.metrics.increment('users_logged', { nodeID }, 1); + metrics.increment('users_logged', { nodeID }, 1); }); server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => { - broker.metrics.decrement('users_connected', { nodeID }, 1); + metrics.decrement('users_connected', { nodeID }, 1); if (userId) { - broker.metrics.decrement('users_logged', { nodeID }, 1); + metrics.decrement('users_logged', { nodeID }, 1); } }); } diff --git a/server/sdk/lib/LocalBroker.ts b/server/sdk/lib/LocalBroker.ts index 72df8452cc2c3..636e81e51f24b 100644 --- a/server/sdk/lib/LocalBroker.ts +++ b/server/sdk/lib/LocalBroker.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; -import { IBroker, IBrokerNode, IServiceMetrics } from '../types/IBroker'; +import { IBroker, IBrokerNode } from '../types/IBroker'; import { ServiceClass } from '../types/ServiceClass'; import { asyncLocalStorage } from '..'; import { EventSignatures } from './Events'; @@ -10,8 +10,6 @@ export class LocalBroker implements IBroker { private events = new EventEmitter(); - metrics: IServiceMetrics; - async call(method: string, data: any): Promise { const result = await asyncLocalStorage.run({ id: 'ctx.id', diff --git a/server/sdk/types/IBroker.ts b/server/sdk/types/IBroker.ts index 529c977e2ab2d..b5c3eda737ee4 100644 --- a/server/sdk/types/IBroker.ts +++ b/server/sdk/types/IBroker.ts @@ -46,7 +46,7 @@ export interface IServiceMetrics { } export interface IBroker { - metrics: IServiceMetrics; + metrics?: IServiceMetrics; destroyService(service: ServiceClass): void; createService(service: ServiceClass): void; call(method: string, data: any): Promise;