diff --git a/ee/server/broker.ts b/ee/server/broker.ts index 3b6aec4e6ef4d..c987e5c9bccdc 100644 --- a/ee/server/broker.ts +++ b/ee/server/broker.ts @@ -3,7 +3,7 @@ import EJSON from 'ejson'; import { asyncLocalStorage, License } from '../../server/sdk'; import { api } from '../../server/sdk/api'; -import { IBroker, IBrokerNode } from '../../server/sdk/types/IBroker'; +import { IBroker, IBrokerNode, IServiceMetrics } from '../../server/sdk/types/IBroker'; import { ServiceClass } from '../../server/sdk/types/ServiceClass'; import { EventSignatures } from '../../server/sdk/lib/Events'; import { LocalBroker } from '../../server/sdk/lib/LocalBroker'; @@ -45,11 +45,15 @@ class NetworkBroker implements IBroker { // list of allowed services to run - has precedence over `internalOnly` private allowedList = new Set(SERVICES_ALLOWED?.split(',').map((i) => i.trim()).filter((i) => i)); + metrics: IServiceMetrics; + constructor(broker: ServiceBroker) { this.broker = broker; api.setBroker(this); + this.metrics = broker.metrics; + this.started = this.broker.start(); this.allowed = License.hasLicense('scalability'); diff --git a/ee/server/services/ddp-streamer/DDPStreamer.ts b/ee/server/services/ddp-streamer/DDPStreamer.ts index bd0952b9223e6..3eacaee44a1fc 100644 --- a/ee/server/services/ddp-streamer/DDPStreamer.ts +++ b/ee/server/services/ddp-streamer/DDPStreamer.ts @@ -7,10 +7,11 @@ import WebSocket from 'ws'; import { Client } from './Client'; // import { STREAMER_EVENTS, STREAM_NAMES } from './constants'; import { ServiceClass } from '../../../../server/sdk/types/ServiceClass'; -import { events } from './configureServer'; +import { events, server } from './configureServer'; import notifications from './streams/index'; import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module'; import { ListenersModule } from '../../../../server/modules/listeners/listeners.module'; +import { DDP_EVENTS } from './constants'; const { PORT: port = 4000, @@ -81,24 +82,6 @@ wss.on('connection', (ws, req) => new Client(ws, req.url !== '/websocket')); // }, // }; -// broker.createService({ -// settings: { -// port: PROMETHEUS_PORT, -// metrics: { -// streamer_users_connected: { -// type: 'Gauge', -// labelNames: ['nodeID'], -// help: 'Users connecteds by streamer', -// }, -// streamer_users_logged: { -// type: 'Gauge', -// labelNames: ['nodeID'], -// help: 'Users logged by streamer', -// }, -// }, -// }, -// mixins: PROMETHEUS_PORT !== 'false' ? [PromService] : [], - export class DDPStreamer extends ServiceClass { protected name = 'streamer'; @@ -142,6 +125,51 @@ export class DDPStreamer extends ServiceClass { }); }); } + + async created(): Promise { + if (!this.context) { + return; + } + + const { broker, nodeID } = this.context; + if (!broker) { + return; + } + + const { metrics } = broker; + if (!metrics) { + return; + } + + metrics.register({ + name: 'users_connected', + type: 'gauge', + labelNames: ['nodeID'], + description: 'Users connected by streamer', + }); + + metrics.register({ + name: 'users_logged', + type: 'gauge', + labelNames: ['nodeID'], + description: 'Users logged by streamer', + }); + + server.on(DDP_EVENTS.CONNECTED, () => { + metrics.increment('users_connected', { nodeID }, 1); + }); + + server.on(DDP_EVENTS.LOGGED, () => { + metrics.increment('users_logged', { nodeID }, 1); + }); + + server.on(DDP_EVENTS.DISCONNECTED, ({ userId }) => { + metrics.decrement('users_connected', { nodeID }, 1); + if (userId) { + metrics.decrement('users_logged', { nodeID }, 1); + } + }); + } } // broker.start(); diff --git a/server/sdk/types/IBroker.ts b/server/sdk/types/IBroker.ts index 4f4d21f7c724c..b5c3eda737ee4 100644 --- a/server/sdk/types/IBroker.ts +++ b/server/sdk/types/IBroker.ts @@ -20,7 +20,33 @@ export interface IBrokerNode { // offlineSince: null } +export type BaseMetricOptions = { + type: string; + name: string; + description?: string; + labelNames?: Array; + unit?: string; + aggregator?: string; +} + +export interface IServiceMetrics { + register(opts: BaseMetricOptions): void; + + hasMetric(name: string): boolean; + + increment(name: string, labels?: Record, value?: number, timestamp?: number): void; + decrement(name: string, labels?: Record, value?: number, timestamp?: number): void; + set(name: string, value: any | null, labels?: Record, timestamp?: number): void; + observe(name: string, value: number, labels?: Record, timestamp?: number): void; + + reset(name: string, labels?: Record, timestamp?: number): void; + resetAll(name: string, timestamp?: number): void; + + timer(name: string, labels?: Record, timestamp?: number): () => number; +} + export interface IBroker { + metrics?: IServiceMetrics; destroyService(service: ServiceClass): void; createService(service: ServiceClass): void; call(method: string, data: any): Promise;