From 0fbe536aa4eea53537d049bb91c126720e0fce02 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 15:30:55 -0300 Subject: [PATCH 1/6] feat: update federation-sdk to include tracing utilities and dependencies --- bun.lock | 11 +- packages/federation-sdk/package.json | 1 + packages/federation-sdk/src/index.ts | 10 + .../repositories/event-staging.repository.ts | 4 +- .../src/repositories/event.repository.ts | 12 +- .../src/repositories/key.repository.ts | 4 +- .../src/repositories/lock.repository.ts | 4 +- .../src/repositories/room.repository.ts | 4 +- .../src/repositories/server.repository.ts | 4 +- .../repositories/state-graph.repository.ts | 2 + .../src/repositories/upload.repository.ts | 6 +- .../src/services/edu.service.ts | 13 +- .../src/services/event-emitter.service.ts | 186 +++++++- .../src/services/event.service.ts | 25 +- .../services/federation-request.service.ts | 34 ++ .../src/services/federation.service.ts | 87 +++- .../src/services/invite.service.ts | 32 +- .../src/services/media.service.ts | 6 + .../src/services/message.service.ts | 162 ++++++- .../src/services/room.service.ts | 73 ++- .../src/services/state.service.ts | 41 +- packages/federation-sdk/src/utils/tracing.ts | 417 ++++++++++++++++++ 22 files changed, 1076 insertions(+), 62 deletions(-) create mode 100644 packages/federation-sdk/src/utils/tracing.ts diff --git a/bun.lock b/bun.lock index 0a6f88c81..86022aee4 100644 --- a/bun.lock +++ b/bun.lock @@ -54,8 +54,9 @@ }, "packages/federation-sdk": { "name": "@rocket.chat/federation-sdk", - "version": "0.3.0", + "version": "0.3.8", "dependencies": { + "@opentelemetry/api": "^1.9.0", "@rocket.chat/emitter": "^0.31.25", "@rocket.chat/federation-core": "workspace:*", "@rocket.chat/federation-crypto": "workspace:*", @@ -147,6 +148,8 @@ "@noble/ed25519": ["@noble/ed25519@3.0.0", "", {}, "sha512-QyteqMNm0GLqfa5SoYbSC3+Pvykwpn95Zgth4MFVSMKBB75ELl9tX1LAVsN4c3HXOrakHsF2gL4zWDAYCcsnzg=="], + "@opentelemetry/api": ["@opentelemetry/api@1.9.0", "", {}, "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg=="], + "@rocket.chat/emitter": ["@rocket.chat/emitter@0.31.25", "", {}, "sha512-hw5BpDlNwpYSb+K5X3DNMNUVEVXxmXugUPetGZGCWvntSVFsOjYuVEypoKW6vBBXSfqCBb0kN1npYcKEb4NFBw=="], "@rocket.chat/federation-core": ["@rocket.chat/federation-core@workspace:packages/core"], @@ -291,7 +294,7 @@ "buffer": ["buffer@6.0.3", "", { "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.2.1" } }, "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA=="], - "bun-types": ["bun-types@1.3.1", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-NMrcy7smratanWJ2mMXdpatalovtxVggkj11bScuWuiOoXTiKIu2eVS1/7qbyI/4yHedtsn175n4Sm4JcdHLXw=="], + "bun-types": ["bun-types@1.3.5", "", { "dependencies": { "@types/node": "*" } }, "sha512-inmAYe2PFLs0SUbFOWSVD24sg1jFlMPxOjOSSCYqUgn4Hsc3rDc7dFvfVYjFPNHtov6kgUeulV4SxbuIV/stPw=="], "chalk": ["chalk@5.4.1", "", {}, "sha512-zgVZuo2WcZgfUEmsn6eO3kINexW8RAE4maiQ8QNs8CtpPCSyMiYsULR3HQYkm3w8FIA3SberyMJMSldGsW+U3w=="], @@ -581,10 +584,6 @@ "@jridgewell/trace-mapping/@jridgewell/sourcemap-codec": ["@jridgewell/sourcemap-codec@1.5.4", "", {}, "sha512-VT2+G1VQs/9oz078bLrYbecdZKs912zQlkelYpuf+SXF+QvZDYJlbx/LSx+meSAwdDFnF8FVXW92AVjjkVmgFw=="], - "@rocket.chat/federation-crypto/bun-types": ["bun-types@1.3.3", "", { "dependencies": { "@types/node": "*" } }, "sha512-z3Xwlg7j2l9JY27x5Qn3Wlyos8YAp0kKRlrePAOjgjMGS5IG6E7Jnlx736vH9UVI4wUICwwhC9anYL++XeOgTQ=="], - - "@rocket.chat/federation-room/bun-types": ["bun-types@1.3.3", "", { "dependencies": { "@types/node": "*" } }, "sha512-z3Xwlg7j2l9JY27x5Qn3Wlyos8YAp0kKRlrePAOjgjMGS5IG6E7Jnlx736vH9UVI4wUICwwhC9anYL++XeOgTQ=="], - "@rocket.chat/homeserver/bun-types": ["bun-types@1.2.22", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-hwaAu8tct/Zn6Zft4U9BsZcXkYomzpHJX28ofvx7k0Zz2HNz54n1n+tDgxoWFGB4PcFvJXJQloPhaV2eP3Q6EA=="], "@scalar/themes/@scalar/types": ["@scalar/types@0.1.7", "", { "dependencies": { "@scalar/openapi-types": "0.2.0", "@unhead/schema": "^1.11.11", "nanoid": "^5.1.5", "type-fest": "^4.20.0", "zod": "^3.23.8" } }, "sha512-irIDYzTQG2KLvFbuTI8k2Pz/R4JR+zUUSykVTbEMatkzMmVFnn1VzNSMlODbadycwZunbnL2tA27AXed9URVjw=="], diff --git a/packages/federation-sdk/package.json b/packages/federation-sdk/package.json index 50fc5eaa3..ac2b12426 100644 --- a/packages/federation-sdk/package.json +++ b/packages/federation-sdk/package.json @@ -16,6 +16,7 @@ "test": "bun test" }, "dependencies": { + "@opentelemetry/api": "^1.9.0", "@rocket.chat/emitter": "^0.31.25", "@rocket.chat/federation-core": "workspace:*", "@rocket.chat/federation-crypto": "workspace:*", diff --git a/packages/federation-sdk/src/index.ts b/packages/federation-sdk/src/index.ts index 427e1d067..5d41bf72d 100644 --- a/packages/federation-sdk/src/index.ts +++ b/packages/federation-sdk/src/index.ts @@ -68,6 +68,16 @@ export { FederationValidationError, } from './services/federation-validation.service'; +// Tracing utilities - compatible with @rocket.chat/tracing +export { + addSpanAttributes, + traced, + tracedClass, + tracerActiveSpan, + hasActiveSpan, + type ITracedClassOptions, +} from './utils/tracing'; + export type HomeserverEventSignatures = { 'homeserver.ping': { message: string; diff --git a/packages/federation-sdk/src/repositories/event-staging.repository.ts b/packages/federation-sdk/src/repositories/event-staging.repository.ts index dd5f3fdf0..984918518 100644 --- a/packages/federation-sdk/src/repositories/event-staging.repository.ts +++ b/packages/federation-sdk/src/repositories/event-staging.repository.ts @@ -1,9 +1,11 @@ import { generateId } from '@rocket.chat/federation-core'; import type { EventStagingStore } from '@rocket.chat/federation-core'; -import { type EventID, Pdu, RoomID } from '@rocket.chat/federation-room'; +import { type EventID, type Pdu, type RoomID } from '@rocket.chat/federation-room'; import type { Collection, DeleteResult, UpdateResult } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; +@tracedClass({ type: 'repository', className: 'EventStagingRepository' }) @singleton() export class EventStagingRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/event.repository.ts b/packages/federation-sdk/src/repositories/event.repository.ts index a9bdb55a3..ba0c8bba3 100644 --- a/packages/federation-sdk/src/repositories/event.repository.ts +++ b/packages/federation-sdk/src/repositories/event.repository.ts @@ -2,12 +2,12 @@ import { generateId } from '@rocket.chat/federation-core'; import type { EventStore } from '@rocket.chat/federation-core'; import { type EventID, - Pdu, - PduForType, - PduType, + type Pdu, + type PduForType, + type PduType, RejectCode, - RoomID, - StateID, + type RoomID, + type StateID, } from '@rocket.chat/federation-room'; import type { Collection, @@ -18,7 +18,9 @@ import type { WithId, } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; +@tracedClass({ type: 'repository', className: 'EventRepository' }) @singleton() export class EventRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/key.repository.ts b/packages/federation-sdk/src/repositories/key.repository.ts index 1ef3cd295..5c7f1586b 100644 --- a/packages/federation-sdk/src/repositories/key.repository.ts +++ b/packages/federation-sdk/src/repositories/key.repository.ts @@ -1,5 +1,6 @@ -import { Collection } from 'mongodb'; +import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type Key = { origin: string; @@ -8,6 +9,7 @@ export type Key = { valid_until: Date; }; +@tracedClass({ type: 'repository', className: 'KeyRepository' }) @singleton() export class KeyRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/lock.repository.ts b/packages/federation-sdk/src/repositories/lock.repository.ts index 4fe294377..908c5ccfe 100644 --- a/packages/federation-sdk/src/repositories/lock.repository.ts +++ b/packages/federation-sdk/src/repositories/lock.repository.ts @@ -1,5 +1,6 @@ -import { Collection } from 'mongodb'; +import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type Lock = { roomId: string; @@ -7,6 +8,7 @@ export type Lock = { lockedAt: Date; }; +@tracedClass({ type: 'repository', className: 'LockRepository' }) @singleton() export class LockRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/room.repository.ts b/packages/federation-sdk/src/repositories/room.repository.ts index 1abe06920..09a700a13 100644 --- a/packages/federation-sdk/src/repositories/room.repository.ts +++ b/packages/federation-sdk/src/repositories/room.repository.ts @@ -1,7 +1,8 @@ import type { EventBase } from '@rocket.chat/federation-core'; import type { EventID } from '@rocket.chat/federation-room'; -import { Collection } from 'mongodb'; +import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type Room = { _id: string; @@ -16,6 +17,7 @@ export type Room = { }; }; +@tracedClass({ type: 'repository', className: 'RoomRepository' }) @singleton() export class RoomRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/server.repository.ts b/packages/federation-sdk/src/repositories/server.repository.ts index 57defef91..094a38ebe 100644 --- a/packages/federation-sdk/src/repositories/server.repository.ts +++ b/packages/federation-sdk/src/repositories/server.repository.ts @@ -1,5 +1,6 @@ -import { Collection } from 'mongodb'; +import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type Server = { name: string; @@ -11,6 +12,7 @@ export type Server = { }; }; +@tracedClass({ type: 'repository', className: 'ServerRepository' }) @singleton() export class ServerRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/state-graph.repository.ts b/packages/federation-sdk/src/repositories/state-graph.repository.ts index 3e0bf36fc..2a592f4e8 100644 --- a/packages/federation-sdk/src/repositories/state-graph.repository.ts +++ b/packages/federation-sdk/src/repositories/state-graph.repository.ts @@ -8,6 +8,7 @@ import { } from '@rocket.chat/federation-room'; import { type Collection, ObjectId } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type StateGraphStore = { _id: StateID; @@ -25,6 +26,7 @@ export type StateGraphStore = { partial: boolean; }; +@tracedClass({ type: 'repository', className: 'StateGraphRepository' }) @singleton() export class StateGraphRepository { constructor( diff --git a/packages/federation-sdk/src/repositories/upload.repository.ts b/packages/federation-sdk/src/repositories/upload.repository.ts index d67ea7458..9cecf9d6f 100644 --- a/packages/federation-sdk/src/repositories/upload.repository.ts +++ b/packages/federation-sdk/src/repositories/upload.repository.ts @@ -1,6 +1,7 @@ -import { RoomID } from '@rocket.chat/federation-room'; -import { Collection } from 'mongodb'; +import type { RoomID } from '@rocket.chat/federation-room'; +import type { Collection } from 'mongodb'; import { inject, singleton } from 'tsyringe'; +import { tracedClass } from '../utils/tracing'; export type Upload = { rid: string; @@ -12,6 +13,7 @@ export type Upload = { }; }; +@tracedClass({ type: 'repository', className: 'UploadRepository' }) @singleton() export class UploadRepository { constructor( diff --git a/packages/federation-sdk/src/services/edu.service.ts b/packages/federation-sdk/src/services/edu.service.ts index 013cefb97..790084d11 100644 --- a/packages/federation-sdk/src/services/edu.service.ts +++ b/packages/federation-sdk/src/services/edu.service.ts @@ -4,12 +4,14 @@ import { createTypingEDU, } from '@rocket.chat/federation-core'; import { createLogger } from '@rocket.chat/federation-core'; -import { RoomID } from '@rocket.chat/federation-room'; +import { type RoomID } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { FederationService } from './federation.service'; import { StateService } from './state.service'; +@tracedClass({ type: 'service', className: 'EduService' }) @singleton() export class EduService { private readonly logger = createLogger('EduService'); @@ -20,6 +22,11 @@ export class EduService { private readonly stateService: StateService, ) {} + @traced((roomId: RoomID, userId: string, typing: boolean) => ({ + roomId, + userId, + typing, + })) async sendTypingNotification( roomId: RoomID, userId: string, @@ -50,6 +57,10 @@ export class EduService { } } + @traced((presenceUpdates: PresenceUpdate[], roomIds: RoomID[]) => ({ + presenceUpdateCount: presenceUpdates?.length, + roomCount: roomIds?.length, + })) async sendPresenceUpdateToRooms( presenceUpdates: PresenceUpdate[], roomIds: RoomID[], diff --git a/packages/federation-sdk/src/services/event-emitter.service.ts b/packages/federation-sdk/src/services/event-emitter.service.ts index af480e440..af89fe47c 100644 --- a/packages/federation-sdk/src/services/event-emitter.service.ts +++ b/packages/federation-sdk/src/services/event-emitter.service.ts @@ -5,43 +5,217 @@ import { logger, } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; +import { SpanStatusCode, context, trace } from '@opentelemetry/api'; -import { Emitter } from '@rocket.chat/emitter'; import type { HomeserverEventSignatures } from '..'; +import { extractEventEmitterAttributes } from '../utils/tracing'; @singleton() export class EventEmitterService { private emitter: AsyncDispatcher = new AsyncDispatcher(); + private tracer = trace.getTracer('@rocket.chat/federation-sdk'); + + /** + * Maps event -> WeakMap. + * This structure allows: + * - Tracking wrappers per event (so same handler on different events doesn't conflict) + * - Multiple wrappers per handler (so same handler subscribed multiple times works) + * - Deterministic removal (FIFO - first subscribed wrapper is removed first) + * Using WeakMap for handler->wrappers to avoid memory leaks when handlers are GC'd. + */ + private handlerMap = new Map< + keyof HomeserverEventSignatures, + WeakMap< + // biome-ignore lint/suspicious/noExplicitAny: Handler functions have varying signatures + (...args: any[]) => any, + // biome-ignore lint/suspicious/noExplicitAny: Handler functions have varying signatures + Array<(...args: any[]) => any> + > + >(); + + /** + * Creates a traced handler wrapper that executes the original handler + * within a span context for tracing purposes. + */ + private createTracedHandler( + event: K, + handler: EventHandlerOf, + handlerType: 'on' | 'once', + ): (data: EventOf) => Promise { + return async ( + data: EventOf, + ): Promise => { + const currentSpan = trace.getSpan(context.active()); + + // If there's an active span (from emit), create a child span for the handler + if (currentSpan) { + return this.tracer.startActiveSpan( + `homeserver-sdk event handler ${event}`, + { + attributes: { + 'event.type': event as string, + 'handler.type': handlerType, + }, + }, + async (span) => { + try { + const result = await (handler as (data: unknown) => unknown)( + data, + ); + return result; + } catch (err) { + span.recordException(err as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err instanceof Error ? err.message : String(err), + }); + throw err; + } finally { + span.end(); + } + }, + ); + } + + // No active span, just call the handler directly + return (handler as (data: unknown) => unknown)(data); + }; + } + + /** + * Generic subscription method that handles handler wrapping, mapping, and registration. + */ + private subscribe( + method: 'on' | 'once', + event: K, + handler: EventHandlerOf, + ): (() => void) | undefined { + const tracedHandler = this.createTracedHandler(event, handler, method); + + // Get or create the WeakMap for this event + let eventHandlers = this.handlerMap.get(event); + if (!eventHandlers) { + eventHandlers = new WeakMap(); + this.handlerMap.set(event, eventHandlers); + } + + // Get or create the array of wrappers for this handler + let wrappers = eventHandlers.get(handler); + if (!wrappers) { + wrappers = []; + eventHandlers.set(handler, wrappers); + } + + // Add the new traced wrapper to the array + wrappers.push(tracedHandler); + + return this.emitter[method]( + event, + tracedHandler as EventHandlerOf, + ); + } + public async emit( event: K, ...[data]: EventOf extends void ? [undefined?] : [EventOf] ): Promise { - await this.emitter.emit(event, ...([data] as any)); - logger.debug({ msg: `Event emitted: ${event}`, event, data }); + const currentSpan = trace.getSpan(context.active()); + + // If there's an active span, emit within a traced context + if (currentSpan) { + const attributes = extractEventEmitterAttributes(event as string, data); + + await this.tracer.startActiveSpan( + `homeserver-sdk event emit ${event}`, + { attributes }, + async (span) => { + try { + // biome-ignore lint/suspicious/noExplicitAny: Complex type inference with event data spreading + await this.emitter.emit(event, ...([data] as any)); + logger.debug({ msg: `Event emitted: ${event}`, event, data }); + } catch (err) { + span.recordException(err as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err instanceof Error ? err.message : String(err), + }); + throw err; + } finally { + span.end(); + } + }, + ); + } else { + // No active span, emit without tracing + // biome-ignore lint/suspicious/noExplicitAny: Complex type inference with event data spreading + await this.emitter.emit(event, ...([data] as any)); + logger.debug({ msg: `Event emitted: ${event}`, event, data }); + } } + /** + * Subscribe to an event with tracing support. + * When the event is emitted, the handler will execute within a span + * that continues the context, allowing handlers to add attributes + * to the span using addSpanAttributes(). + */ public on( event: K, handler: EventHandlerOf, ): (() => void) | undefined { - return this.emitter.on(event, handler); + return this.subscribe('on', event, handler); } + /** + * Subscribe to an event once with tracing support. + * Similar to on(), but automatically unsubscribes after the first event. + */ public once( event: K, handler: EventHandlerOf, ): (() => void) | undefined { - return this.emitter.once(event, handler); + return this.subscribe('once', event, handler); } public off( event: K, handler: EventHandlerOf, ): void { - this.emitter.off(event, handler); + // Look up the event's handler map + const eventHandlers = this.handlerMap.get(event); + if (!eventHandlers) { + // Fallback: try with the original handler in case it was registered directly + this.emitter.off(event, handler); + return; + } + + // Look up the wrappers for this handler + const wrappers = eventHandlers.get(handler); + if (!wrappers || wrappers.length === 0) { + // Fallback: try with the original handler in case it was registered directly + this.emitter.off(event, handler); + return; + } + + // Remove the first wrapper (FIFO - first subscribed is first removed) + const wrappedHandler = wrappers.shift(); + if (wrappedHandler) { + this.emitter.off( + event, + wrappedHandler as EventHandlerOf, + ); + } + + // Clean up empty arrays from the WeakMap + if (wrappers.length === 0) { + eventHandlers.delete(handler); + } + + // Note: We don't remove empty WeakMaps from handlerMap since we can't + // check if a WeakMap is empty. This is fine since event keys are finite. } } diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 587d7965a..5d0b1da87 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -19,9 +19,9 @@ import { type PduForType, type PduType, PersistentEventFactory, - RoomID, + type RoomID, RoomState, - RoomVersion, + type RoomVersion, type State, getAuthChain, } from '@rocket.chat/federation-room'; @@ -32,6 +32,7 @@ import { EventStagingRepository } from '../repositories/event-staging.repository import { EventRepository } from '../repositories/event.repository'; import { LockRepository } from '../repositories/lock.repository'; import { eventSchemas } from '../utils/event-schemas'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { EventEmitterService } from './event-emitter.service'; import { ServerService } from './server.service'; @@ -42,6 +43,7 @@ export interface AuthEventParams { senderId: string; } +@tracedClass({ type: 'service', className: 'EventService' }) @singleton() export class EventService { private readonly logger = createLogger('EventService'); @@ -63,6 +65,10 @@ export class EventService { private readonly lockRepository: LockRepository, ) {} + @traced((eventId: EventID, type?: PduType) => ({ + eventId, + eventType: type, + })) async getEventById>>( eventId: EventID, type?: T, @@ -74,6 +80,9 @@ export class EventService { return (this.eventRepository.findById(eventId) ?? null) as Promise

; } + @traced((eventIds: EventID[]) => ({ + eventCount: eventIds?.length, + })) async checkIfEventsExists( eventIds: EventID[], ): Promise<{ missing: EventID[]; found: EventID[] }> { @@ -110,6 +119,11 @@ export class EventService { await this.eventStagingRepository.removeByEventId(event._id); } + @traced((params: { origin: string; pdus: Pdu[]; edus?: unknown[] }) => ({ + origin: params?.origin, + pduCount: params?.pdus?.length, + eduCount: params?.edus?.length, + })) async processIncomingTransaction({ origin, pdus, @@ -152,6 +166,10 @@ export class EventService { } } + @traced((origin: string, pdus: Pdu[]) => ({ + origin, + pduCount: pdus?.length, + })) async processIncomingPDUs(origin: string, pdus: Pdu[]): Promise { // organize events by room id const eventsByRoomId = new Map(); @@ -286,6 +304,9 @@ export class EventService { }); } + @traced((edus: BaseEDU[]) => ({ + eduCount: edus?.length, + })) private async processIncomingEDUs(edus: BaseEDU[]): Promise { this.logger.debug(`Processing ${edus.length} incoming EDUs`); diff --git a/packages/federation-sdk/src/services/federation-request.service.ts b/packages/federation-sdk/src/services/federation-request.service.ts index 41b4b24a6..ea7084eda 100644 --- a/packages/federation-sdk/src/services/federation-request.service.ts +++ b/packages/federation-sdk/src/services/federation-request.service.ts @@ -15,6 +15,7 @@ import { import { singleton } from 'tsyringe'; import * as nacl from 'tweetnacl'; import { getHomeserverFinalAddress } from '../server-discovery/discovery'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; interface SignedRequest { @@ -48,12 +49,20 @@ export class SelfServerFetchError extends Error { } } +@tracedClass({ type: 'service', className: 'FederationRequestService' }) @singleton() export class FederationRequestService { private readonly logger = createLogger('FederationRequestService'); constructor(private readonly configService: ConfigService) {} + @traced( + (params: { method: string; domain: string; uri: string }) => ({ + method: params?.method, + targetDomain: params?.domain, + uri: params?.uri, + }), + ) async makeSignedRequest({ method, domain, @@ -137,6 +146,11 @@ export class FederationRequestService { return response; } + @traced((method: string, targetServer: string, endpoint: string) => ({ + method, + targetServer, + endpoint, + })) async request( method: HttpMethod, targetServer: string, @@ -177,6 +191,11 @@ export class FederationRequestService { ).json(); } + @traced((targetServer: string, endpoint: string) => ({ + method: 'GET', + targetServer, + endpoint, + })) async get( targetServer: string, endpoint: string, @@ -191,6 +210,11 @@ export class FederationRequestService { ); } + @traced((targetServer: string, endpoint: string) => ({ + method: 'PUT', + targetServer, + endpoint, + })) async put( targetServer: string, endpoint: string, @@ -200,6 +224,11 @@ export class FederationRequestService { return this.request('PUT', targetServer, endpoint, body, queryParams); } + @traced((targetServer: string, endpoint: string) => ({ + method: 'POST', + targetServer, + endpoint, + })) async post( targetServer: string, endpoint: string, @@ -209,6 +238,11 @@ export class FederationRequestService { return this.request('POST', targetServer, endpoint, body, queryParams); } + @traced((method: string, targetServer: string, endpoint: string) => ({ + method, + targetServer, + endpoint, + })) async requestBinaryData( method: string, targetServer: string, diff --git a/packages/federation-sdk/src/services/federation.service.ts b/packages/federation-sdk/src/services/federation.service.ts index fc7e7f860..d944e0006 100644 --- a/packages/federation-sdk/src/services/federation.service.ts +++ b/packages/federation-sdk/src/services/federation.service.ts @@ -2,10 +2,11 @@ import type { EventBase } from '@rocket.chat/federation-core'; import type { BaseEDU } from '@rocket.chat/federation-core'; import { createLogger } from '@rocket.chat/federation-core'; import { - EventID, - Pdu, - PersistentEventBase, + type EventID, + type Pdu, + type PersistentEventBase, PersistentEventFactory, + type RoomVersion, extractDomainFromId, } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; @@ -17,10 +18,12 @@ import { type Transaction, type Version, } from '../specs/federation-api'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { FederationRequestService } from './federation-request.service'; import { StateService } from './state.service'; +@tracedClass({ type: 'service', className: 'FederationService' }) @singleton() export class FederationService { private readonly logger = createLogger('FederationService'); @@ -34,6 +37,12 @@ export class FederationService { /** * Get a make_join template for a room and user */ + @traced((domain: string, roomId: string, userId: string, version?: string) => ({ + targetDomain: domain, + roomId, + userId, + version, + })) async makeJoin( domain: string, roomId: string, @@ -64,6 +73,11 @@ export class FederationService { /** * Send a join event to a remote server */ + @traced((joinEvent: PersistentEventBase, omitMembers?: boolean) => ({ + eventId: joinEvent?.eventId, + roomId: joinEvent?.roomId, + omitMembers, + })) async sendJoin( joinEvent: PersistentEventBase, omitMembers = false, @@ -98,6 +112,11 @@ export class FederationService { } } + @traced((domain: string, roomId: string, userId: string) => ({ + targetDomain: domain, + roomId, + userId, + })) async makeLeave( domain: string, roomId: string, @@ -115,6 +134,10 @@ export class FederationService { } } + @traced((leaveEvent: PersistentEventBase) => ({ + eventId: leaveEvent?.eventId, + roomId: leaveEvent?.roomId, + })) async sendLeave(leaveEvent: PersistentEventBase): Promise { try { const uri = FederationEndpoints.sendLeave( @@ -145,6 +168,13 @@ export class FederationService { /** * Send a transaction to a remote server */ + @traced( + (domain: string, transaction: { pdus?: unknown[]; edus?: unknown[] }) => ({ + targetDomain: domain, + pduCount: transaction?.pdus?.length, + eduCount: transaction?.edus?.length, + }), + ) async sendTransaction( domain: string, transaction: Transaction, @@ -167,6 +197,11 @@ export class FederationService { /** * Send an event to a remote server */ + @traced((domain: string, event: Pdu) => ({ + targetDomain: domain, + eventType: event?.type, + roomId: event?.room_id, + })) async sendEvent( domain: string, event: T, @@ -188,6 +223,10 @@ export class FederationService { /** * Get events from a remote server */ + @traced((domain: string, eventId: string) => ({ + targetDomain: domain, + eventId, + })) async getEvent(domain: string, eventId: string): Promise { try { const uri = FederationEndpoints.getEvent(eventId); @@ -201,6 +240,21 @@ export class FederationService { /** * Get events from a remote server */ + @traced( + ( + domain: string, + roomId: string, + earliestEvents: EventID[], + latestEvents: EventID[], + limit?: number, + ) => ({ + targetDomain: domain, + roomId, + earliestEventCount: earliestEvents?.length, + latestEventCount: latestEvents?.length, + limit, + }), + ) async getMissingEvents( domain: string, roomId: string, @@ -226,6 +280,11 @@ export class FederationService { /** * Get state for a room from remote server */ + @traced((domain: string, roomId: string, eventId: string) => ({ + targetDomain: domain, + roomId, + eventId, + })) async getState( domain: string, roomId: string, @@ -245,6 +304,10 @@ export class FederationService { /** * Get state IDs for a room from remote server */ + @traced((domain: string, roomId: string) => ({ + targetDomain: domain, + roomId, + })) async getStateIds(domain: string, roomId: string): Promise { try { const uri = FederationEndpoints.getStateIds(roomId); @@ -258,6 +321,9 @@ export class FederationService { /** * Get server version information */ + @traced((domain: string) => ({ + targetDomain: domain, + })) async getVersion(domain: string): Promise { try { return await this.requestService.get( @@ -271,6 +337,12 @@ export class FederationService { } // invite user from another homeserver to our homeserver + @traced((inviteEvent: PersistentEventBase, roomVersion: string) => ({ + eventId: inviteEvent?.eventId, + roomId: inviteEvent?.roomId, + targetUser: inviteEvent?.stateKey, + roomVersion, + })) async inviteUser(inviteEvent: PersistentEventBase, roomVersion: string) { const uri = FederationEndpoints.inviteV2( inviteEvent.roomId, @@ -301,6 +373,11 @@ export class FederationService { }); } + @traced((event: PersistentEventBase) => ({ + eventId: event?.eventId, + eventType: event?.type, + roomId: event?.roomId, + })) async sendEventToAllServersInRoom(event: PersistentEventBase) { const servers = await this.stateService.getServerSetInRoom(event.roomId); @@ -352,6 +429,10 @@ export class FederationService { } } + @traced((edus: BaseEDU[], servers: string[]) => ({ + eduCount: edus?.length, + serverCount: servers?.length, + })) async sendEDUToServers(edus: BaseEDU[], servers: string[]): Promise { // Process servers sequentially to avoid concurrent transactions per Matrix spec for (const server of servers) { diff --git a/packages/federation-sdk/src/services/invite.service.ts b/packages/federation-sdk/src/services/invite.service.ts index 056dd0a82..6732aae75 100644 --- a/packages/federation-sdk/src/services/invite.service.ts +++ b/packages/federation-sdk/src/services/invite.service.ts @@ -1,16 +1,17 @@ import { createLogger } from '@rocket.chat/federation-core'; import { - EventID, - PduForType, - PersistentEventBase, + type EventID, + type PduForType, + type PersistentEventBase, PersistentEventFactory, - RoomID, - RoomVersion, - UserID, + type RoomID, + type RoomVersion, + type UserID, extractDomainFromId, } from '@rocket.chat/federation-room'; import { delay, inject, singleton } from 'tsyringe'; import { EventRepository } from '../repositories/event.repository'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { EventAuthorizationService } from './event-authorization.service'; import { EventEmitterService } from './event-emitter.service'; @@ -24,6 +25,7 @@ export class NotAllowedError extends Error { } } +@tracedClass({ type: 'service', className: 'InviteService' }) @singleton() export class InviteService { private readonly logger = createLogger('InviteService'); @@ -43,6 +45,14 @@ export class InviteService { /** * Invite a user to an existing room */ + @traced( + (userId: UserID, roomId: RoomID, sender: UserID, isDirectMessage?: boolean) => ({ + userId, + roomId, + sender, + isDirectMessage, + }), + ) async inviteUserToRoom( userId: UserID, roomId: RoomID, @@ -181,6 +191,16 @@ export class InviteService { } } + @traced( + ( + _event: unknown, + eventId: EventID, + roomVersion: RoomVersion, + ) => ({ + eventId, + roomVersion, + }), + ) async processInvite( event: PduForType<'m.room.member'>, eventId: EventID, diff --git a/packages/federation-sdk/src/services/media.service.ts b/packages/federation-sdk/src/services/media.service.ts index dc9ff04ef..6e86fc5b8 100644 --- a/packages/federation-sdk/src/services/media.service.ts +++ b/packages/federation-sdk/src/services/media.service.ts @@ -1,8 +1,10 @@ import { createLogger } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { FederationRequestService } from './federation-request.service'; +@tracedClass({ type: 'service', className: 'MediaService' }) @singleton() export class MediaService { private readonly logger = createLogger('MediaService'); @@ -12,6 +14,10 @@ export class MediaService { private readonly federationRequest: FederationRequestService, ) {} + @traced((serverName: string, mediaId: string) => ({ + serverName, + mediaId, + })) async downloadFromRemoteServer( serverName: string, mediaId: string, diff --git a/packages/federation-sdk/src/services/message.service.ts b/packages/federation-sdk/src/services/message.service.ts index c51c40e87..9779642dd 100644 --- a/packages/federation-sdk/src/services/message.service.ts +++ b/packages/federation-sdk/src/services/message.service.ts @@ -3,10 +3,11 @@ import { createLogger } from '@rocket.chat/federation-core'; import { type EventID, type PersistentEventBase, - RoomID, - UserID, + type RoomID, + type UserID, } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; +import { addSpanAttributes, traced, tracedClass } from '../utils/tracing'; import { EventService } from './event.service'; import { FederationService } from './federation.service'; import { RoomService } from './room.service'; @@ -48,6 +49,7 @@ export type FileMessageContent = { }; }; +@tracedClass({ type: 'service', className: 'MessageService' }) @singleton() export class MessageService { private readonly logger = createLogger('MessageService'); @@ -103,6 +105,20 @@ export class MessageService { } } + @traced( + ( + roomId: RoomID, + rawMessage: string, + _formattedMessage: string, + senderUserId: UserID, + reply?: Reply, + ) => ({ + roomId, + senderUserId, + hasReply: Boolean(reply), + messageLength: rawMessage?.length, + }), + ) async sendMessage( roomId: RoomID, rawMessage: string, @@ -137,6 +153,12 @@ export class MessageService { roomVersion, ); + // Add runtime attributes after event is created + addSpanAttributes({ + eventId: event.eventId, + roomVersion, + }); + await this.stateService.handlePdu(event); if (event.rejected) { throw new Error(event.rejectReason); @@ -151,6 +173,20 @@ export class MessageService { * * @deprecated Use sendMessage and replyToEventId instead */ + @traced( + ( + roomId: RoomID, + rawMessage: string, + _formattedMessage: string, + eventToReplyTo: EventID, + senderUserId: UserID, + ) => ({ + roomId, + senderUserId, + eventToReplyTo, + messageLength: rawMessage?.length, + }), + ) async sendReplyToMessage( roomId: RoomID, rawMessage: string, @@ -176,6 +212,20 @@ export class MessageService { ); } + @traced( + ( + roomId: RoomID, + content: FileMessageContent, + senderUserId: UserID, + reply?: Reply, + ) => ({ + roomId, + senderUserId, + hasReply: Boolean(reply), + msgtype: content?.msgtype, + mimetype: content?.info?.mimetype, + }), + ) async sendFileMessage( roomId: RoomID, content: FileMessageContent, @@ -206,6 +256,12 @@ export class MessageService { roomVersion, ); + // Add runtime attributes after event is created + addSpanAttributes({ + eventId: event.eventId, + roomVersion, + }); + await this.stateService.handlePdu(event); if (event.rejected) { throw new Error(event.rejectReason); @@ -219,6 +275,20 @@ export class MessageService { /** * @deprecated Use sendMessage and threadEventId/replyToEventId instead */ + @traced( + ( + roomId: RoomID, + rawMessage: string, + _formattedMessage: string, + senderUserId: UserID, + threadRootEventId: EventID, + ) => ({ + roomId, + senderUserId, + threadRootEventId, + messageLength: rawMessage?.length, + }), + ) async sendThreadMessage( roomId: RoomID, rawMessage: string, @@ -249,6 +319,22 @@ export class MessageService { /** * @deprecated Use sendMessage and threadEventId/replyToEventId instead */ + @traced( + ( + roomId: RoomID, + rawMessage: string, + _formattedMessage: string, + senderUserId: UserID, + threadRootEventId: EventID, + eventToReplyTo: EventID, + ) => ({ + roomId, + senderUserId, + threadRootEventId, + eventToReplyTo, + messageLength: rawMessage?.length, + }), + ) async sendReplyToInsideThreadMessage( roomId: RoomID, rawMessage: string, @@ -276,6 +362,14 @@ export class MessageService { ); } + @traced( + (roomId: RoomID, eventId: EventID, emoji: string, senderUserId: UserID) => ({ + roomId, + eventId, + emoji, + senderUserId, + }), + ) async sendReaction( roomId: RoomID, eventId: EventID, @@ -314,6 +408,12 @@ export class MessageService { roomInfo.room_version, ); + // Add runtime attributes after event is created + addSpanAttributes({ + reactionEventId: reactionEvent.eventId, + roomVersion: roomInfo.room_version, + }); + await this.stateService.handlePdu(reactionEvent); void this.federationService.sendEventToAllServersInRoom(reactionEvent); @@ -321,6 +421,19 @@ export class MessageService { return reactionEvent.eventId; } + @traced( + ( + roomId: RoomID, + eventIdReactedTo: EventID, + emoji: string, + senderUserId: UserID, + ) => ({ + roomId, + eventIdReactedTo, + emoji, + senderUserId, + }), + ) async unsetReaction( roomId: RoomID, eventIdReactedTo: EventID, @@ -347,6 +460,12 @@ export class MessageService { roomInfo.room_version, ); + // Add runtime attributes after event is created + addSpanAttributes({ + redactionEventId: redactionEvent.eventId, + roomVersion: roomInfo.room_version, + }); + await this.stateService.handlePdu(redactionEvent); void this.federationService.sendEventToAllServersInRoom(redactionEvent); @@ -354,6 +473,20 @@ export class MessageService { return redactionEvent.eventId; } + @traced( + ( + roomId: RoomID, + rawMessage: string, + _formattedMessage: string, + senderUserId: UserID, + eventIdToReplace: EventID, + ) => ({ + roomId, + senderUserId, + eventIdToReplace, + messageLength: rawMessage?.length, + }), + ) async updateMessage( roomId: RoomID, rawMessage: string, @@ -363,7 +496,7 @@ export class MessageService { ): Promise { const roomInfo = await this.stateService.getRoomInformation(roomId); - const redactionEvent = await this.stateService.buildEvent<'m.room.message'>( + const updateEvent = await this.stateService.buildEvent<'m.room.message'>( { type: 'm.room.message', content: { @@ -392,13 +525,23 @@ export class MessageService { roomInfo.room_version, ); - await this.stateService.handlePdu(redactionEvent); + // Add runtime attributes after event is created + addSpanAttributes({ + updateEventId: updateEvent.eventId, + roomVersion: roomInfo.room_version, + }); - void this.federationService.sendEventToAllServersInRoom(redactionEvent); + await this.stateService.handlePdu(updateEvent); - return redactionEvent.eventId; + void this.federationService.sendEventToAllServersInRoom(updateEvent); + + return updateEvent.eventId; } + @traced((roomId: RoomID, eventIdToRedact: EventID) => ({ + roomId, + eventIdToRedact, + })) async redactMessage( roomId: RoomID, eventIdToRedact: EventID, @@ -436,6 +579,13 @@ export class MessageService { roomInfo.room_version, ); + // Add runtime attributes after event is created + addSpanAttributes({ + redactionEventId: redactionEvent.eventId, + roomVersion: roomInfo.room_version, + originalSender: senderUserId.event.sender, + }); + await this.stateService.handlePdu(redactionEvent); void this.federationService.sendEventToAllServersInRoom(redactionEvent); diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index cdc320669..c3456f039 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -1,9 +1,9 @@ import { - EventBase, - EventStore, - RoomPowerLevelsEvent, - SignedEvent, - TombstoneAuthEvents, + type EventBase, + type EventStore, + type RoomPowerLevelsEvent, + type SignedEvent, + type TombstoneAuthEvents, createLogger, roomPowerLevelsEvent, } from '@rocket.chat/federation-core'; @@ -18,20 +18,21 @@ import { import { logger } from '@rocket.chat/federation-core'; import { type EventID, - PduForType, - PduJoinRuleEventContent, - PduType, - PersistentEventBase, + type PduForType, + type PduJoinRuleEventContent, + type PduType, + type PersistentEventBase, PersistentEventFactory, - EventStore as RoomEventStore, - RoomID, - RoomVersion, - UserID, + type EventStore as RoomEventStore, + type RoomID, + type RoomVersion, + type UserID, extractDomainFromId, } from '@rocket.chat/federation-room'; import { EventStagingRepository } from '../repositories/event-staging.repository'; import { EventRepository } from '../repositories/event.repository'; import { RoomRepository } from '../repositories/room.repository'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import { EventAuthorizationService } from './event-authorization.service'; import { EventEmitterService } from './event-emitter.service'; @@ -46,6 +47,7 @@ import { UnknownRoomError, } from './state.service'; +@tracedClass({ type: 'service', className: 'RoomService' }) @singleton() export class RoomService { private readonly logger = createLogger('RoomService'); @@ -195,6 +197,10 @@ export class RoomService { } } + @traced((roomId: string, state: EventBase[]) => ({ + roomId, + stateEventCount: state?.length, + })) async upsertRoom(roomId: string, state: EventBase[]) { logger.info(`Upserting room ${roomId} with ${state.length} state events`); @@ -230,6 +236,17 @@ export class RoomService { /** * Create a new room with the given sender and username */ + @traced( + ( + username: UserID, + name: string, + joinRule: PduJoinRuleEventContent['join_rule'], + ) => ({ + roomName: name, + creatorUserId: username, + visibility: joinRule, + }), + ) async createRoom( username: UserID, name: string, @@ -369,6 +386,11 @@ export class RoomService { }; } + @traced((roomId: RoomID, newName: string, senderUserId: UserID) => ({ + roomId, + newName, + senderUserId, + })) async updateRoomName(roomId: RoomID, name: string, senderId: UserID) { logger.info( `Updating room name for ${roomId} to \"${name}\" by ${senderId}`, @@ -402,6 +424,11 @@ export class RoomService { return roomNameEvent; } + @traced((roomId: RoomID, senderUserId: UserID, topic: string) => ({ + roomId, + senderUserId, + topicLength: topic?.length, + })) async setRoomTopic(roomId: RoomID, sender: UserID, topic: string) { const roomVersion = await this.stateService.getRoomVersion(roomId); if (!roomVersion) { @@ -641,6 +668,10 @@ export class RoomService { }); } + @traced((roomId: RoomID, userId: UserID) => ({ + roomId, + userId, + })) async leaveRoom(roomId: RoomID, senderId: UserID): Promise { logger.info(`User ${senderId} leaving room ${roomId}`); @@ -701,6 +732,13 @@ export class RoomService { return leaveEvent.eventId; } + @traced( + (roomId: RoomID, kickedUserId: UserID, senderId: UserID, _reason?: string) => ({ + roomId, + senderId, + targetUserId: kickedUserId, + }), + ) async kickUser( roomId: RoomID, kickedUserId: UserID, @@ -826,6 +864,14 @@ export class RoomService { return memberEvent; } + @traced( + (roomId: RoomID, bannedUserId: UserID, senderId: UserID, reason?: string) => ({ + roomId, + senderId, + targetUserId: bannedUserId, + reason, + }), + ) async banUser( roomId: RoomID, bannedUserId: UserID, @@ -1414,6 +1460,7 @@ export class RoomService { }; } + @traced((roomId: RoomID) => ({ roomId })) public async isRoomTombstoned(roomId: RoomID): Promise { try { const room = await this.roomRepository.findOneById(roomId); diff --git a/packages/federation-sdk/src/services/state.service.ts b/packages/federation-sdk/src/services/state.service.ts index c1361e524..a667aac55 100644 --- a/packages/federation-sdk/src/services/state.service.ts +++ b/packages/federation-sdk/src/services/state.service.ts @@ -2,18 +2,18 @@ import { createLogger, signEvent } from '@rocket.chat/federation-core'; import { type EventID, type EventStore, - Pdu, + type Pdu, type PduContent, - PduCreateEventContent, - PduForType, + type PduCreateEventContent, + type PduForType, type PduType, - PduWithHashesAndSignaturesOptional, - PersistentEventBase, + type PduWithHashesAndSignaturesOptional, + type PersistentEventBase, PersistentEventFactory, RejectCode, - RoomID, + type RoomID, RoomState, - RoomVersion, + type RoomVersion, State, type StateID, type StateMapKey, @@ -25,6 +25,7 @@ import { import { delay, inject, singleton } from 'tsyringe'; import { EventRepository } from '../repositories/event.repository'; import { StateGraphRepository } from '../repositories/state-graph.repository'; +import { traced, tracedClass } from '../utils/tracing'; import { ConfigService } from './config.service'; import type { EventService } from './event.service'; type StrippedEvent = { @@ -67,6 +68,7 @@ export class RoomInfoNotReadyError extends Error { } } +@tracedClass({ type: 'service', className: 'StateService' }) @singleton() export class StateService { private readonly logger = createLogger('StateService'); @@ -83,6 +85,7 @@ export class StateService { // TODO: this is a very vague method, better would be to use exactly what needed, // or getCreateEvent. // currently AFAIK mostly is used for just room version + @traced((roomId: string) => ({ roomId })) async getRoomInformation(roomId: string): Promise { const { event, stateId } = (await this.eventRepository.findByRoomIdAndType( @@ -102,6 +105,7 @@ export class StateService { return event.content; } + @traced((roomId: RoomID) => ({ roomId })) async getRoomVersion(roomId: RoomID): Promise { const createEvent = await this.eventRepository.findByRoomIdAndType( roomId, @@ -167,6 +171,7 @@ export class StateService { return stateId; } + @traced((roomId: RoomID) => ({ roomId })) async getLatestRoomState(roomId: RoomID): Promise { const roomVersion = await this.getRoomVersion(roomId); @@ -277,6 +282,17 @@ export class StateService { }; } + @traced( + ( + event: { type: string; room_id: string; sender: string }, + roomVersion: string, + ) => ({ + eventType: event?.type, + roomId: event?.room_id, + sender: event?.sender, + roomVersion, + }), + ) async buildEvent( event: PduWithHashesAndSignaturesOptional>, roomVersion: RoomVersion, @@ -330,6 +346,10 @@ export class StateService { event.addPrevEvents(events); } + @traced((event: PersistentEventBase) => ({ + eventId: event?.eventId, + eventType: event?.type, + })) public async signEvent(event: T) { if (process.env.NODE_ENV === 'test') return event; @@ -585,6 +605,11 @@ export class StateService { // handle received pdu from transaction // implements spec:https://spec.matrix.org/v1.12/server-server-api/#checks-performed-on-receipt-of-a-pdu // TODO: this is not state related, can and should accept timeline events too, move to event service? + @traced((event: PersistentEventBase) => ({ + eventId: event?.eventId, + eventType: event?.type, + roomId: event?.roomId, + })) async handlePdu

(pdu: P): Promise { if (pdu.isCreateEvent()) { this.logger.debug({ eventId: pdu.eventId }, 'handling create event'); @@ -784,6 +809,7 @@ export class StateService { // const stateId = await this.getStateIdBeforeEvent(event); // return this.getStateAtStateId(stateId, event.version); // } + @traced((roomId: RoomID) => ({ roomId })) async getServerSetInRoom(roomId: RoomID) { const state = await this.getLatestRoomState(roomId); @@ -812,6 +838,7 @@ export class StateService { } // @deprecated use getServerSetInRoom + @traced((roomId: RoomID) => ({ roomId })) async getServersInRoom(roomId: RoomID) { return Array.from(await this.getServerSetInRoom(roomId)); } diff --git a/packages/federation-sdk/src/utils/tracing.ts b/packages/federation-sdk/src/utils/tracing.ts new file mode 100644 index 000000000..82fce4983 --- /dev/null +++ b/packages/federation-sdk/src/utils/tracing.ts @@ -0,0 +1,417 @@ +import { SpanStatusCode, context, trace } from '@opentelemetry/api'; +import type { Span, SpanOptions } from '@opentelemetry/api'; + +/** + * Symbol key used to store the attribute extractor on methods + */ +export const TRACE_EXTRACTOR_KEY = Symbol('traceExtractor'); + +/** + * Type for the extractor function stored on decorated methods + */ +export type TraceExtractor = ( + ...args: TArgs +) => Record; + +/** + * Interface for methods that have a trace extractor attached + */ +// biome-ignore lint/complexity/noBannedTypes: Function type needed for method interface +export interface ITracedMethod extends Function { + [TRACE_EXTRACTOR_KEY]?: TraceExtractor; +} + +/** + * Decorator that attaches an attribute extractor to a method for tracing. + * The extractor receives the method arguments and returns attributes to add to the span. + * + * Use this decorator on methods to define inline attribute extraction that + * will be picked up by `@tracedClass`. + * + * @param extractor - Function that extracts trace attributes from method arguments + * + * @example + * @tracedClass({ type: 'service' }) + * class FederationMatrix { + * @traced((room: IRoom, owner: IUser) => ({ + * roomId: room?._id, + * roomName: room?.name || room?.fname, + * ownerId: owner?._id, + * })) + * async createRoom(room: IRoom, owner: IUser) { + * // method implementation + * } + * } + */ +export function traced( + extractor: (...args: TArgs) => Record, +): MethodDecorator { + return (_target, _propertyKey, descriptor: PropertyDescriptor) => { + const originalMethod = descriptor.value as ITracedMethod; + if (originalMethod) { + originalMethod[TRACE_EXTRACTOR_KEY] = extractor as TraceExtractor; + } + return descriptor; + }; +} + +/** + * Get the trace extractor from a method, if one was attached via @traced decorator. + */ +function getTraceExtractor(method: unknown): TraceExtractor | undefined { + if (typeof method === 'function') { + return (method as ITracedMethod)[TRACE_EXTRACTOR_KEY]; + } + return undefined; +} + +/** + * Options for @tracedClass decorator + */ +export interface ITracedClassOptions { + /** + * The type prefix for span names (e.g., 'model', 'service', 'handler') + */ + type: string; + + /** + * The class name to use in span names. Required because minification + * mangles constructor.name. + */ + className: string; + + /** + * Array of method names to exclude from tracing + */ + ignoreMethods?: string[]; +} + +/** + * Sanitize arguments for tracing, filtering out large objects and mongo sessions + */ +const sanitizeArguments = (args: unknown[]): unknown[] => { + return args.map((arg) => { + // Filter out mongo sessions + if (typeof arg === 'object' && arg != null && 'session' in arg) { + return '[mongo options with session]'; + } + // For large objects, include first 10 keys and indicate more were skipped + if (typeof arg === 'object' && arg !== null) { + const keys = Object.keys(arg); + if (keys.length > 10) { + const limitedObject: Record = {}; + // Include first 10 keys + for (let i = 0; i < 10; i++) { + limitedObject[keys[i]] = (arg as Record)[keys[i]]; + } + // Add indicator that more keys were skipped + const skippedKeysKey = '_skipped_keys'; + limitedObject[skippedKeysKey] = keys.length - 10; + return limitedObject; + } + } + return arg; + }); +}; + +/** + * Execute a function within a traced span if there's an active context. + * This ensures SDK spans are children of the calling application's spans. + * + * @param name - The name of the span + * @param options - Span options including attributes + * @param fn - The function to execute + * @returns The result of the function + */ +export function tracerActiveSpan ReturnType>( + name: string, + options: SpanOptions, + fn: F, +): ReturnType { + const tracer = trace.getTracer('@rocket.chat/federation-sdk'); + const currentSpan = trace.getSpan(context.active()); + + // If there's no active span, just execute the function without tracing + if (!currentSpan) { + return fn(); + } + + const computeResult = (span: Span) => { + try { + const result = fn(span); + if (result instanceof Promise) { + result + .catch((err: unknown) => { + span.recordException(err as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err instanceof Error ? err.message : String(err), + }); + }) + .finally(() => span.end()); + + return result; + } + + span.end(); + return result; + } catch (err: unknown) { + span.recordException(err as Error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err instanceof Error ? err.message : String(err), + }); + span.end(); + throw err; + } + }; + + return tracer.startActiveSpan(name, options, computeResult); +} + +/** + * Class decorator that automatically wraps all methods with OpenTelemetry tracing spans. + * This decorator wraps methods at the prototype level, making it compatible with + * dependency injection frameworks like tsyringe. + * + * @param options - Configuration options for tracing + * + * @example + * @tracedClass({ type: 'service', className: 'MyService' }) + * class FederationMatrix extends ServiceClass { + * @traced((room: IRoom, owner: IUser) => ({ roomId: room?._id })) + * async createRoom(room: IRoom, owner: IUser) { ... } + * } + * + * @example + * @tracedClass({ type: 'repository', className: 'UsersRaw' }) + * class UsersRaw extends BaseRaw { ... } + */ +export function tracedClass( + options: ITracedClassOptions, + // biome-ignore lint/complexity/noBannedTypes: Function type needed for class decorator +): (target: TFunction) => TFunction { + const { type, className, ignoreMethods = [] } = options; + + // biome-ignore lint/complexity/noBannedTypes: Function type needed for class decorator + return (target: TFunction): TFunction => { + const prototype = target.prototype; + + // Get methods from entire prototype chain (excluding Object.prototype) + let proto = prototype; + const methodNames = new Set(); + while (proto && proto !== Object.prototype) { + for (const name of Object.getOwnPropertyNames(proto)) { + if ( + name !== 'constructor' && + typeof proto[name] === 'function' && + !ignoreMethods.includes(name) + ) { + methodNames.add(name); + } + } + proto = Object.getPrototypeOf(proto); + } + + // Wrap each method with tracing + for (const methodName of methodNames) { + const originalMethod = prototype[methodName]; + + prototype[methodName] = function ( + this: unknown, + ...args: unknown[] + ): unknown { + const attributes: Record = { + [type]: className, + method: methodName, + }; + + // Check for @traced decorator extractor + const extractor = getTraceExtractor(originalMethod); + + if (extractor) { + try { + const extractedAttrs = extractor(...args); + Object.assign(attributes, extractedAttrs); + } catch { + // If extractor fails, continue with base attributes + } + } else { + attributes.parameters = sanitizeArguments(args); + } + + return tracerActiveSpan( + `homeserver-sdk ${type} ${className}.${methodName}`, + { + attributes: attributes as Record< + string, + string | number | boolean | undefined + >, + }, + () => { + return originalMethod.apply(this, args); + }, + ); + }; + + // Preserve the original method's name and any attached metadata (like @traced extractors) + Object.defineProperty(prototype[methodName], 'name', { + value: methodName, + }); + + // Copy over the trace extractor if it exists + const extractor = getTraceExtractor(originalMethod); + if (extractor) { + (prototype[methodName] as ITracedMethod)[TRACE_EXTRACTOR_KEY] = + extractor; + } + } + + return target; + }; +} + +/** + * Add attributes to the currently active span. + * Use this inside methods to add runtime information discovered during execution, + * such as computed values, data fetched from DB, or other contextual info. + * + * @param attributes - Key-value pairs to add to the current span (string, number, boolean, or undefined values only) + * + * @example + * async sendMessage(roomId, message, ...) { + * const event = await this.stateService.buildEvent(...); + * + * // Add runtime info after we have it + * addSpanAttributes({ + * eventId: event.eventId, + * eventType: event.type, + * }); + * } + */ +export function addSpanAttributes( + attributes: Record, +): void { + const span = trace.getActiveSpan(); + if (span) { + span.setAttributes(attributes); + } +} + +/** + * Check if there's an active tracing context. + * Useful for conditional logic based on whether tracing is active. + */ +export function hasActiveSpan(): boolean { + return trace.getActiveSpan() !== undefined; +} + +/** + * Extract attributes from event emitter event data based on event type. + * This function extracts relevant debugging information from event payloads + * to add as span attributes when events are emitted. + * + * @param eventType - The event type being emitted (e.g., 'homeserver.matrix.message') + * @param data - The event data payload + * @returns Record of attributes to add to the span + */ +export function extractEventEmitterAttributes( + eventType: string, + data: unknown, +): Record { + const attributes: Record = { + 'event.type': eventType, + }; + + if (!data || typeof data !== 'object') { + return attributes; + } + + const eventData = data as Record; + + // Extract common fields that appear in most events + if ('event_id' in eventData && typeof eventData.event_id === 'string') { + attributes['event.id'] = eventData.event_id; + } + + if ('room_id' in eventData && typeof eventData.room_id === 'string') { + attributes['room.id'] = eventData.room_id; + } + + if ('user_id' in eventData && typeof eventData.user_id === 'string') { + attributes['user.id'] = eventData.user_id; + } + + if ('sender_id' in eventData && typeof eventData.sender_id === 'string') { + attributes['sender.id'] = eventData.sender_id; + } + + // Extract nested event data if present + if ( + 'event' in eventData && + typeof eventData.event === 'object' && + eventData.event !== null + ) { + const nestedEvent = eventData.event as Record; + + if ('room_id' in nestedEvent && typeof nestedEvent.room_id === 'string') { + attributes['room.id'] = nestedEvent.room_id; + } + + if ('sender' in nestedEvent && typeof nestedEvent.sender === 'string') { + attributes['sender.id'] = nestedEvent.sender; + } + + if ('type' in nestedEvent && typeof nestedEvent.type === 'string') { + attributes['matrix.event.type'] = nestedEvent.type; + } + + if ( + 'state_key' in nestedEvent && + typeof nestedEvent.state_key === 'string' + ) { + attributes['state.key'] = nestedEvent.state_key; + } + } + + // Event-specific attribute extraction + switch (eventType) { + case 'homeserver.matrix.typing': + if ('typing' in eventData && typeof eventData.typing === 'boolean') { + attributes.typing = eventData.typing; + } + if ('origin' in eventData && typeof eventData.origin === 'string') { + attributes.origin = eventData.origin; + } + break; + + case 'homeserver.matrix.presence': + if ('presence' in eventData && typeof eventData.presence === 'string') { + attributes['presence.state'] = eventData.presence; + } + if ( + 'last_active_ago' in eventData && + typeof eventData.last_active_ago === 'number' + ) { + attributes['presence.last_active_ago'] = eventData.last_active_ago; + } + if ('origin' in eventData && typeof eventData.origin === 'string') { + attributes.origin = eventData.origin; + } + break; + + case 'homeserver.matrix.room.role': + if ('role' in eventData && typeof eventData.role === 'string') { + attributes.role = eventData.role; + } + break; + + case 'homeserver.ping': + if ('message' in eventData && typeof eventData.message === 'string') { + attributes['ping.message'] = eventData.message; + } + break; + } + + return attributes; +} From abf82ee353826177fec39909b9da9068bff6041c Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 15:49:52 -0300 Subject: [PATCH 2/6] refactor: fixing linting issues --- .../repositories/event-staging.repository.ts | 6 +++++- .../src/services/event-emitter.service.ts | 2 +- .../services/federation-request.service.ts | 12 +++++------ .../src/services/federation.service.ts | 14 +++++++------ .../src/services/invite.service.ts | 21 +++++++++---------- .../src/services/message.service.ts | 7 ++++++- .../src/services/room.service.ts | 14 +++++++++++-- 7 files changed, 47 insertions(+), 29 deletions(-) diff --git a/packages/federation-sdk/src/repositories/event-staging.repository.ts b/packages/federation-sdk/src/repositories/event-staging.repository.ts index 984918518..46b7373d9 100644 --- a/packages/federation-sdk/src/repositories/event-staging.repository.ts +++ b/packages/federation-sdk/src/repositories/event-staging.repository.ts @@ -1,6 +1,10 @@ import { generateId } from '@rocket.chat/federation-core'; import type { EventStagingStore } from '@rocket.chat/federation-core'; -import { type EventID, type Pdu, type RoomID } from '@rocket.chat/federation-room'; +import { + type EventID, + type Pdu, + type RoomID, +} from '@rocket.chat/federation-room'; import type { Collection, DeleteResult, UpdateResult } from 'mongodb'; import { inject, singleton } from 'tsyringe'; import { tracedClass } from '../utils/tracing'; diff --git a/packages/federation-sdk/src/services/event-emitter.service.ts b/packages/federation-sdk/src/services/event-emitter.service.ts index af89fe47c..f03fd2bae 100644 --- a/packages/federation-sdk/src/services/event-emitter.service.ts +++ b/packages/federation-sdk/src/services/event-emitter.service.ts @@ -1,3 +1,4 @@ +import { SpanStatusCode, context, trace } from '@opentelemetry/api'; import { AsyncDispatcher, type EventHandlerOf, @@ -5,7 +6,6 @@ import { logger, } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; -import { SpanStatusCode, context, trace } from '@opentelemetry/api'; import type { HomeserverEventSignatures } from '..'; import { extractEventEmitterAttributes } from '../utils/tracing'; diff --git a/packages/federation-sdk/src/services/federation-request.service.ts b/packages/federation-sdk/src/services/federation-request.service.ts index ea7084eda..907e6f12c 100644 --- a/packages/federation-sdk/src/services/federation-request.service.ts +++ b/packages/federation-sdk/src/services/federation-request.service.ts @@ -56,13 +56,11 @@ export class FederationRequestService { constructor(private readonly configService: ConfigService) {} - @traced( - (params: { method: string; domain: string; uri: string }) => ({ - method: params?.method, - targetDomain: params?.domain, - uri: params?.uri, - }), - ) + @traced((params: { method: string; domain: string; uri: string }) => ({ + method: params?.method, + targetDomain: params?.domain, + uri: params?.uri, + })) async makeSignedRequest({ method, domain, diff --git a/packages/federation-sdk/src/services/federation.service.ts b/packages/federation-sdk/src/services/federation.service.ts index d944e0006..d93178281 100644 --- a/packages/federation-sdk/src/services/federation.service.ts +++ b/packages/federation-sdk/src/services/federation.service.ts @@ -37,12 +37,14 @@ export class FederationService { /** * Get a make_join template for a room and user */ - @traced((domain: string, roomId: string, userId: string, version?: string) => ({ - targetDomain: domain, - roomId, - userId, - version, - })) + @traced( + (domain: string, roomId: string, userId: string, version?: string) => ({ + targetDomain: domain, + roomId, + userId, + version, + }), + ) async makeJoin( domain: string, roomId: string, diff --git a/packages/federation-sdk/src/services/invite.service.ts b/packages/federation-sdk/src/services/invite.service.ts index 6732aae75..8bed94cab 100644 --- a/packages/federation-sdk/src/services/invite.service.ts +++ b/packages/federation-sdk/src/services/invite.service.ts @@ -46,7 +46,12 @@ export class InviteService { * Invite a user to an existing room */ @traced( - (userId: UserID, roomId: RoomID, sender: UserID, isDirectMessage?: boolean) => ({ + ( + userId: UserID, + roomId: RoomID, + sender: UserID, + isDirectMessage?: boolean, + ) => ({ userId, roomId, sender, @@ -191,16 +196,10 @@ export class InviteService { } } - @traced( - ( - _event: unknown, - eventId: EventID, - roomVersion: RoomVersion, - ) => ({ - eventId, - roomVersion, - }), - ) + @traced((_event: unknown, eventId: EventID, roomVersion: RoomVersion) => ({ + eventId, + roomVersion, + })) async processInvite( event: PduForType<'m.room.member'>, eventId: EventID, diff --git a/packages/federation-sdk/src/services/message.service.ts b/packages/federation-sdk/src/services/message.service.ts index 9779642dd..a71f6ae7b 100644 --- a/packages/federation-sdk/src/services/message.service.ts +++ b/packages/federation-sdk/src/services/message.service.ts @@ -363,7 +363,12 @@ export class MessageService { } @traced( - (roomId: RoomID, eventId: EventID, emoji: string, senderUserId: UserID) => ({ + ( + roomId: RoomID, + eventId: EventID, + emoji: string, + senderUserId: UserID, + ) => ({ roomId, eventId, emoji, diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index c3456f039..9fd08ea99 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -733,7 +733,12 @@ export class RoomService { } @traced( - (roomId: RoomID, kickedUserId: UserID, senderId: UserID, _reason?: string) => ({ + ( + roomId: RoomID, + kickedUserId: UserID, + senderId: UserID, + _reason?: string, + ) => ({ roomId, senderId, targetUserId: kickedUserId, @@ -865,7 +870,12 @@ export class RoomService { } @traced( - (roomId: RoomID, bannedUserId: UserID, senderId: UserID, reason?: string) => ({ + ( + roomId: RoomID, + bannedUserId: UserID, + senderId: UserID, + reason?: string, + ) => ({ roomId, senderId, targetUserId: bannedUserId, From b28d29b3738de1272823300b775916e258cf7c8c Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 19:49:33 -0300 Subject: [PATCH 3/6] chore: specify bun version file in CI workflow --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59af57043..731b68fd7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,6 +27,8 @@ jobs: restore-keys: | ${{ runner.os }}-turbo- - uses: oven-sh/setup-bun@v2 + with: + bun-version-file: package.json - run: bun install - run: bun run build - run: bun lint:ci From 1a365cbad5a51bb7c36f0065cc213bdff2e575e8 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 20:10:02 -0300 Subject: [PATCH 4/6] chore: update bun package manager version to 1.2.0 in package.json --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 5e5ec4a37..610998226 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "pre-commit": "bun test" } }, - "packageManager": "bun@1.1.10", + "packageManager": "bun@1.2.0", "lint-staged": { "**.{js|ts|cjs|mjs|d.cts|d.mts|jsx|tsx|json|jsonc}": [ "biome check --files-ignore-unknown=true --diagnostic-level=error", From 3465db6df31f2686f5bcbbd46dab654b69196dfc Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 20:22:01 -0300 Subject: [PATCH 5/6] chore: update bun package manager version to 1.2.5 in package.json so test.failing() is recognized --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 610998226..7d7159392 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "pre-commit": "bun test" } }, - "packageManager": "bun@1.2.0", + "packageManager": "bun@1.2.5", "lint-staged": { "**.{js|ts|cjs|mjs|d.cts|d.mts|jsx|tsx|json|jsonc}": [ "biome check --files-ignore-unknown=true --diagnostic-level=error", From c9dbc51a638ce80a8e2d50286c70a1ff294109a5 Mon Sep 17 00:00:00 2001 From: dhulke Date: Sun, 18 Jan 2026 21:56:38 -0300 Subject: [PATCH 6/6] chore: update bun package manager version to 1.2.23 in package.json and remove bun-version-file from CI workflow --- .github/workflows/ci.yml | 2 -- package.json | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 731b68fd7..59af57043 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -27,8 +27,6 @@ jobs: restore-keys: | ${{ runner.os }}-turbo- - uses: oven-sh/setup-bun@v2 - with: - bun-version-file: package.json - run: bun install - run: bun run build - run: bun lint:ci diff --git a/package.json b/package.json index 7d7159392..5f145906f 100644 --- a/package.json +++ b/package.json @@ -31,7 +31,7 @@ "pre-commit": "bun test" } }, - "packageManager": "bun@1.2.5", + "packageManager": "bun@1.2.23", "lint-staged": { "**.{js|ts|cjs|mjs|d.cts|d.mts|jsx|tsx|json|jsonc}": [ "biome check --files-ignore-unknown=true --diagnostic-level=error",