diff --git a/bun.lock b/bun.lock index 86022aee..d81574ac 100644 --- a/bun.lock +++ b/bun.lock @@ -54,7 +54,7 @@ }, "packages/federation-sdk": { "name": "@rocket.chat/federation-sdk", - "version": "0.3.8", + "version": "0.3.9", "dependencies": { "@opentelemetry/api": "^1.9.0", "@rocket.chat/emitter": "^0.31.25", @@ -62,6 +62,7 @@ "@rocket.chat/federation-crypto": "workspace:*", "@rocket.chat/federation-room": "workspace:*", "mongodb": "^6.16.0", + "prom-client": "^15.1.3", "reflect-metadata": "^0.2.2", "tsyringe": "^4.10.0", "tweetnacl": "^1.0.3", @@ -288,13 +289,15 @@ "base64-js": ["base64-js@1.5.1", "", {}, "sha512-AKpaYlHn8t4SVbOHCy+b5+KKgvR4vrsD8vbvrbiQJps7fKDTkjkDry6ji0rUJjC0kzbNePLwzxq8iypo41qeWA=="], + "bintrees": ["bintrees@1.0.2", "", {}, "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw=="], + "braces": ["braces@3.0.3", "", { "dependencies": { "fill-range": "^7.1.1" } }, "sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA=="], "bson": ["bson@6.10.4", "", {}, "sha512-WIsKqkSC0ABoBJuT1LEX+2HEvNmNKKgnTAyd0fL8qzK4SH2i9NXg+t08YtdZp/V9IZ33cxe3iV4yM0qg8lMQng=="], "buffer": ["buffer@6.0.3", "", { "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.2.1" } }, "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA=="], - "bun-types": ["bun-types@1.3.5", "", { "dependencies": { "@types/node": "*" } }, "sha512-inmAYe2PFLs0SUbFOWSVD24sg1jFlMPxOjOSSCYqUgn4Hsc3rDc7dFvfVYjFPNHtov6kgUeulV4SxbuIV/stPw=="], + "bun-types": ["bun-types@1.3.8", "", { "dependencies": { "@types/node": "*" } }, "sha512-fL99nxdOWvV4LqjmC+8Q9kW3M4QTtTR1eePs94v5ctGqU8OeceWrSUaRw3JYb7tU3FkMIAjkueehrHPPPGKi5Q=="], "chalk": ["chalk@5.4.1", "", {}, "sha512-zgVZuo2WcZgfUEmsn6eO3kINexW8RAE4maiQ8QNs8CtpPCSyMiYsULR3HQYkm3w8FIA3SberyMJMSldGsW+U3w=="], @@ -456,6 +459,8 @@ "process-warning": ["process-warning@3.0.0", "", {}, "sha512-mqn0kFRl0EoqhnL0GQ0veqFHyIN1yig9RHh/InzORTUiZHFRAur+aMtRkELNwGs9aNwKS6tg/An4NYBPGwvtzQ=="], + "prom-client": ["prom-client@15.1.3", "", { "dependencies": { "@opentelemetry/api": "^1.4.0", "tdigest": "^0.1.1" } }, "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g=="], + "pump": ["pump@3.0.3", "", { "dependencies": { "end-of-stream": "^1.1.0", "once": "^1.3.1" } }, "sha512-todwxLMY7/heScKmntwQG8CXVkWUOdYxIvY2s0VWAAMh/nd8SoYiRaKjlr7+iCs984f2P8zvrfWcDDYVb73NfA=="], "punycode": ["punycode@2.3.1", "", {}, "sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg=="], @@ -516,6 +521,8 @@ "supports-preserve-symlinks-flag": ["supports-preserve-symlinks-flag@1.0.0", "", {}, "sha512-ot0WnXS9fgdkgIcePe6RHNk1WA8+muPa6cSjeR3V8K27q9BB1rTE3R1p7Hv0z1ZyAc8s6Vvv8DIyWf681MAt0w=="], + "tdigest": ["tdigest@0.1.2", "", { "dependencies": { "bintrees": "1.0.2" } }, "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA=="], + "thread-stream": ["thread-stream@2.7.0", "", { "dependencies": { "real-require": "^0.2.0" } }, "sha512-qQiRWsU/wvNolI6tbbCKd9iKaTnCXsTwVxhhKM6nctPdujTyztjlbUkUTUymidWcMnZ5pWR0ej4a0tjsW021vw=="], "to-regex-range": ["to-regex-range@5.0.1", "", { "dependencies": { "is-number": "^7.0.0" } }, "sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ=="], diff --git a/packages/federation-sdk/package.json b/packages/federation-sdk/package.json index ac2b1242..8dd9010a 100644 --- a/packages/federation-sdk/package.json +++ b/packages/federation-sdk/package.json @@ -17,6 +17,7 @@ }, "dependencies": { "@opentelemetry/api": "^1.9.0", + "prom-client": "^15.1.3", "@rocket.chat/emitter": "^0.31.25", "@rocket.chat/federation-core": "workspace:*", "@rocket.chat/federation-crypto": "workspace:*", diff --git a/packages/federation-sdk/src/index.ts b/packages/federation-sdk/src/index.ts index 5d41bf72..24453b32 100644 --- a/packages/federation-sdk/src/index.ts +++ b/packages/federation-sdk/src/index.ts @@ -78,6 +78,19 @@ export { type ITracedClassOptions, } from './utils/tracing'; +export { federationMetrics, initMetrics } from './metrics'; +export { + bucketizeEduCount, + bucketizePduCount, + determineMessageType, + extractOriginFromMatrixRoomId, + extractOriginFromMatrixUserId, + getEventTypeLabel, +} from './metrics/helpers'; + +// Event emitter types +export type { EventHandlerExceptionHandler } from './services/event-emitter.service'; + export type HomeserverEventSignatures = { 'homeserver.ping': { message: string; diff --git a/packages/federation-sdk/src/metrics/helpers.ts b/packages/federation-sdk/src/metrics/helpers.ts new file mode 100644 index 00000000..22016d8c --- /dev/null +++ b/packages/federation-sdk/src/metrics/helpers.ts @@ -0,0 +1,87 @@ +/** + * Extracts the origin server domain from a Matrix room ID. + * @example extractOriginFromMatrixRoomId('!room:matrix.org') // 'matrix.org' + */ +export function extractOriginFromMatrixRoomId(roomId: string): string { + return roomId.split(':').pop() || 'unknown'; +} + +/** + * Extracts the origin server domain from a Matrix user ID. + * @example extractOriginFromMatrixUserId('@user:matrix.org') // 'matrix.org' + */ +export function extractOriginFromMatrixUserId(userId: string): string { + return userId.split(':').pop() || 'unknown'; +} + +// File types for message type detection +const fileTypes = ['m.image', 'm.video', 'm.audio', 'm.file']; + +/** + * Determines the message type from a Matrix event for metrics labeling. + * @returns 'text' | 'file' | 'encrypted' + */ +export function determineMessageType(event: { + type?: string; + content?: { msgtype?: string }; +}): 'text' | 'file' | 'encrypted' { + if (event.type === 'm.room.encrypted') { + return 'encrypted'; + } + + const msgtype = event.content?.msgtype; + if (msgtype && fileTypes.includes(msgtype)) { + return 'file'; + } + + return 'text'; +} + +/** + * Bucketizes PDU count for metrics labeling to avoid high cardinality. + * Groups counts into buckets: 0, 1, 2-5, 6-10, 11-50, 51+ + */ +export function bucketizePduCount(count: number): string { + if (count === 0) return '0'; + if (count === 1) return '1'; + if (count <= 5) return '2-5'; + if (count <= 10) return '6-10'; + if (count <= 50) return '11-50'; + return '51+'; +} + +/** + * Bucketizes EDU count for metrics labeling to avoid high cardinality. + * Groups counts into buckets: 0, 1, 2-5, 6-10, 11+ + */ +export function bucketizeEduCount(count: number): string { + if (count === 0) return '0'; + if (count === 1) return '1'; + if (count <= 5) return '2-5'; + if (count <= 10) return '6-10'; + return '11+'; +} + +/** + * Maps event emitter event types to simplified event_type labels for metrics. + */ +export function getEventTypeLabel(event: string): string { + // Map homeserver.matrix.* events to simplified labels + const mapping: Record = { + 'homeserver.matrix.message': 'message', + 'homeserver.matrix.encrypted': 'message', + 'homeserver.matrix.membership': 'membership', + 'homeserver.matrix.redaction': 'redaction', + 'homeserver.matrix.reaction': 'reaction', + 'homeserver.matrix.typing': 'typing', + 'homeserver.matrix.presence': 'presence', + 'homeserver.matrix.room.create': 'room_create', + 'homeserver.matrix.room.name': 'room_update', + 'homeserver.matrix.room.topic': 'room_update', + 'homeserver.matrix.room.power_levels': 'room_update', + 'homeserver.matrix.room.server_acl': 'room_update', + 'homeserver.matrix.room.role': 'role_change', + 'homeserver.matrix.encryption': 'encryption', + }; + return mapping[event] || event.replace('homeserver.matrix.', ''); +} diff --git a/packages/federation-sdk/src/metrics/index.ts b/packages/federation-sdk/src/metrics/index.ts new file mode 100644 index 00000000..4c36b08e --- /dev/null +++ b/packages/federation-sdk/src/metrics/index.ts @@ -0,0 +1,131 @@ +import client, { type Registry } from 'prom-client'; + +let registry: Registry = client.register; + +export function initMetrics(opts: { registry: Registry }) { + registry = opts.registry; +} + +const percentiles = [0.01, 0.1, 0.5, 0.9, 0.95, 0.99, 1]; + +/** + * Gets an existing metric from the registry or creates it if it doesn't exist. + * This ensures we don't get duplicate registration errors when the SDK + * is used alongside other apps that also register metrics. + */ +function getOrCreateMetric( + name: string, + createFn: () => T, +): T { + const existing = registry.getSingleMetric(name); + if (existing) { + return existing as T; + } + return createFn(); +} + +/** + * Federation metrics for incoming operations. + */ +export const federationMetrics = { + /** Counter for federation events processed */ + get federationEventsProcessed() { + return getOrCreateMetric( + 'rocketchat_federation_events_processed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_processed', + labelNames: ['event_type', 'direction'], + help: 'Total federation events processed', + registers: [registry], + }), + ); + }, + + /** Counter for failed federation events */ + get federationEventsFailed() { + return getOrCreateMetric( + 'rocketchat_federation_events_failed', + () => + new client.Counter({ + name: 'rocketchat_federation_events_failed', + labelNames: ['event_type', 'direction', 'error_type'], + help: 'Total federation events that failed to process', + registers: [registry], + }), + ); + }, + + /** Counter for messages received from other federated servers */ + get federatedMessagesReceived() { + return getOrCreateMetric( + 'rocketchat_federation_messages_received', + () => + new client.Counter({ + name: 'rocketchat_federation_messages_received', + labelNames: ['message_type', 'origin'], + help: 'Total federated messages received', + registers: [registry], + }), + ); + }, + + /** Counter for rooms joined */ + get federatedRoomsJoined() { + return getOrCreateMetric( + 'rocketchat_federation_rooms_joined', + () => + new client.Counter({ + name: 'rocketchat_federation_rooms_joined', + labelNames: ['origin'], + help: 'Total federated rooms joined', + registers: [registry], + }), + ); + }, + + /** Duration to process incoming federation transaction */ + get federationTransactionProcessDuration() { + return getOrCreateMetric( + 'rocketchat_federation_transaction_process_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_transaction_process_duration_seconds', + labelNames: ['pdu_count', 'edu_count', 'origin'], + help: 'Time to process incoming federation transaction', + percentiles, + registers: [registry], + }), + ); + }, + + /** Duration to process incoming federated message */ + get federationIncomingMessageProcessDuration() { + return getOrCreateMetric( + 'rocketchat_federation_incoming_message_process_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_incoming_message_process_duration_seconds', + labelNames: ['message_type'], + help: 'Time to process incoming federated message', + percentiles, + registers: [registry], + }), + ); + }, + + /** Duration to join a federated room */ + get federationRoomJoinDuration() { + return getOrCreateMetric( + 'rocketchat_federation_room_join_duration_seconds', + () => + new client.Summary({ + name: 'rocketchat_federation_room_join_duration_seconds', + labelNames: ['origin'], + help: 'Time to join a federated room (invite acceptance)', + percentiles, + registers: [registry], + }), + ); + }, +}; diff --git a/packages/federation-sdk/src/services/event-emitter.service.ts b/packages/federation-sdk/src/services/event-emitter.service.ts index f03fd2ba..99b5fa0b 100644 --- a/packages/federation-sdk/src/services/event-emitter.service.ts +++ b/packages/federation-sdk/src/services/event-emitter.service.ts @@ -8,8 +8,26 @@ import { import { singleton } from 'tsyringe'; import type { HomeserverEventSignatures } from '..'; +import { federationMetrics } from '../metrics'; +import { + determineMessageType, + extractOriginFromMatrixRoomId, + extractOriginFromMatrixUserId, + getEventTypeLabel, +} from '../metrics/helpers'; import { extractEventEmitterAttributes } from '../utils/tracing'; +/** + * Exception handler type for event handlers. + * Called when an event handler throws an error, after metrics are recorded. + * The original error is always re-thrown after this handler completes. + */ +export type EventHandlerExceptionHandler = ( + error: unknown, + event: string, + data: unknown, +) => void | Promise; + @singleton() export class EventEmitterService { private emitter: AsyncDispatcher = @@ -43,47 +61,144 @@ export class EventEmitterService { event: K, handler: EventHandlerOf, handlerType: 'on' | 'once', + onError?: EventHandlerExceptionHandler, ): (data: EventOf) => Promise { return async ( data: EventOf, ): Promise => { - const currentSpan = trace.getSpan(context.active()); - - // If there's an active span (from emit), create a child span for the handler - if (currentSpan) { - return this.tracer.startActiveSpan( - `homeserver-sdk event handler ${event}`, - { - attributes: { - 'event.type': event as string, - 'handler.type': handlerType, + const startTime = Date.now(); + const eventTypeLabel = getEventTypeLabel(event as string); + + // Extract event data for metrics labels + const eventData = data as Record; + const nestedEvent = eventData?.event as + | Record + | undefined; + + try { + const currentSpan = trace.getSpan(context.active()); + + let result: unknown; + if (currentSpan) { + result = await this.tracer.startActiveSpan( + `homeserver-sdk event handler ${event}`, + { + attributes: { + 'event.type': event as string, + 'handler.type': handlerType, + }, + }, + async (span) => { + try { + return await (handler as (data: unknown) => unknown)(data); + } catch (err) { + span.recordException(err as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err instanceof Error ? err.message : String(err), + }); + throw err; + } finally { + span.end(); + } }, - }, - async (span) => { - try { - const result = await (handler as (data: unknown) => unknown)( - data, - ); - return result; - } catch (err) { - span.recordException(err as Error); - span.setStatus({ - code: SpanStatusCode.ERROR, - message: err instanceof Error ? err.message : String(err), - }); - throw err; - } finally { - span.end(); - } - }, + ); + } else { + result = await (handler as (data: unknown) => unknown)(data); + } + + // Record success metrics + federationMetrics.federationEventsProcessed.inc({ + event_type: eventTypeLabel, + direction: 'incoming', + }); + + // Record event-specific metrics + this.recordEventSpecificMetrics( + event as string, + nestedEvent, + startTime, ); - } - // No active span, just call the handler directly - return (handler as (data: unknown) => unknown)(data); + return result; + } catch (err) { + // Record failure metrics + federationMetrics.federationEventsFailed.inc({ + event_type: eventTypeLabel, + direction: 'incoming', + error_type: err instanceof Error ? err.constructor.name : 'Unknown', + }); + + // Call optional exception handler if provided + if (onError) { + try { + await onError(err, event as string, data); + } catch (handlerErr) { + // Log but don't replace the original error + logger.error( + { + msg: 'Exception handler threw an error', + event, + originalError: err, + handlerError: handlerErr, + }, + 'Exception handler failed', + ); + } + } + + throw err; + } }; } + /** + * Records event-specific metrics based on event type. + */ + private recordEventSpecificMetrics( + event: string, + nestedEvent: Record | undefined, + startTime: number, + ): void { + const durationSeconds = (Date.now() - startTime) / 1000; + + if ( + event === 'homeserver.matrix.message' || + event === 'homeserver.matrix.encrypted' + ) { + const messageType = determineMessageType(nestedEvent || {}); + const origin = extractOriginFromMatrixUserId( + String(nestedEvent?.sender || ''), + ); + + federationMetrics.federatedMessagesReceived.inc({ + message_type: messageType, + origin, + }); + + federationMetrics.federationIncomingMessageProcessDuration.observe( + { message_type: messageType }, + durationSeconds, + ); + } + + if (event === 'homeserver.matrix.membership') { + const content = nestedEvent?.content as + | Record + | undefined; + if (content?.membership === 'join') { + const roomId = String(nestedEvent?.room_id || ''); + const origin = extractOriginFromMatrixRoomId(roomId); + + federationMetrics.federatedRoomsJoined.inc({ origin }); + federationMetrics.federationRoomJoinDuration.observe( + { origin }, + durationSeconds, + ); + } + } + } + /** * Generic subscription method that handles handler wrapping, mapping, and registration. */ @@ -91,8 +206,14 @@ export class EventEmitterService { method: 'on' | 'once', event: K, handler: EventHandlerOf, + onError?: EventHandlerExceptionHandler, ): (() => void) | undefined { - const tracedHandler = this.createTracedHandler(event, handler, method); + const tracedHandler = this.createTracedHandler( + event, + handler, + method, + onError, + ); // Get or create the WeakMap for this event let eventHandlers = this.handlerMap.get(event); @@ -162,23 +283,35 @@ export class EventEmitterService { * When the event is emitted, the handler will execute within a span * that continues the context, allowing handlers to add attributes * to the span using addSpanAttributes(). + * + * @param event - The event to subscribe to + * @param handler - The handler function to execute when the event is emitted + * @param onError - Optional exception handler called when the handler throws. + * Called after metrics are recorded but before the error is re-thrown. */ public on( event: K, handler: EventHandlerOf, + onError?: EventHandlerExceptionHandler, ): (() => void) | undefined { - return this.subscribe('on', event, handler); + return this.subscribe('on', event, handler, onError); } /** * Subscribe to an event once with tracing support. * Similar to on(), but automatically unsubscribes after the first event. + * + * @param event - The event to subscribe to + * @param handler - The handler function to execute when the event is emitted + * @param onError - Optional exception handler called when the handler throws. + * Called after metrics are recorded but before the error is re-thrown. */ public once( event: K, handler: EventHandlerOf, + onError?: EventHandlerExceptionHandler, ): (() => void) | undefined { - return this.subscribe('once', event, handler); + return this.subscribe('once', event, handler, onError); } public off( diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 5d0b1da8..f4e0fa93 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -27,6 +27,8 @@ import { } from '@rocket.chat/federation-room'; import { delay, inject, singleton } from 'tsyringe'; import type { z } from 'zod'; +import { federationMetrics } from '../metrics'; +import { bucketizeEduCount, bucketizePduCount } from '../metrics/helpers'; import { StagingAreaQueue } from '../queues/staging-area.queue'; import { EventStagingRepository } from '../repositories/event-staging.repository'; import { EventRepository } from '../repositories/event.repository'; @@ -133,27 +135,34 @@ export class EventService { pdus: Pdu[]; edus?: BaseEDU[]; }): Promise { - if (!Array.isArray(pdus)) { - throw new Error('pdus must be an array'); - } + const endTimer = + federationMetrics.federationTransactionProcessDuration.startTimer({ + pdu_count: bucketizePduCount(pdus?.length || 0), + edu_count: bucketizeEduCount(edus?.length || 0), + origin, + }); - if (edus && !Array.isArray(edus)) { - throw new Error('edus must be an array'); - } + try { + if (!Array.isArray(pdus)) { + throw new Error('pdus must be an array'); + } - const totalPdus = pdus.length; - const totalEdus = edus?.length || 0; + if (edus && !Array.isArray(edus)) { + throw new Error('edus must be an array'); + } - if (totalPdus > 50 || totalEdus > 100) { - throw new Error('too-many-events'); - } + const totalPdus = pdus.length; + const totalEdus = edus?.length || 0; - // only one current transaction per origin is allowed - if (this.currentTransactions.has(origin)) { - throw new Error('too-many-concurrent-transactions'); - } + if (totalPdus > 50 || totalEdus > 100) { + throw new Error('too-many-events'); + } + + // only one current transaction per origin is allowed + if (this.currentTransactions.has(origin)) { + throw new Error('too-many-concurrent-transactions'); + } - try { this.currentTransactions.add(origin); // process both PDU and EDU in "parallel" to no block EDUs due to heavy PDU operations @@ -163,6 +172,7 @@ export class EventService { ]); } finally { this.currentTransactions.delete(origin); + endTimer(); } }