From 5e24d2d9dd252ca18e2aac2db3d6b74dbe02e398 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Sun, 12 Oct 2025 11:39:10 -0300 Subject: [PATCH] chore: limit staging area processing by 10 tries --- packages/core/src/models/event.model.ts | 1 + .../repositories/event-staging.repository.ts | 21 +++++- .../src/server-discovery/discovery.ts | 2 +- .../src/services/staging-area.service.ts | 68 +++++++++++-------- 4 files changed, 59 insertions(+), 33 deletions(-) diff --git a/packages/core/src/models/event.model.ts b/packages/core/src/models/event.model.ts index 456c448e..a28a2b51 100644 --- a/packages/core/src/models/event.model.ts +++ b/packages/core/src/models/event.model.ts @@ -39,6 +39,7 @@ export interface EventStore extends PersistentEventBase { export interface EventStagingStore extends PersistentEventBase { roomId: string; + got: number; from: 'join' | 'transaction'; } diff --git a/packages/federation-sdk/src/repositories/event-staging.repository.ts b/packages/federation-sdk/src/repositories/event-staging.repository.ts index 40c95d15..f814bcdd 100644 --- a/packages/federation-sdk/src/repositories/event-staging.repository.ts +++ b/packages/federation-sdk/src/repositories/event-staging.repository.ts @@ -10,7 +10,12 @@ export class EventStagingRepository { @inject('EventStagingCollection') private readonly collection: Collection, ) { - this.collection.createIndex({ roomId: 1, createdAt: 1 }); + this.collection.createIndex({ + roomId: 1, + got: 1, + 'event.depth': 1, + createdAt: 1, + }); } async create( @@ -30,6 +35,7 @@ export class EventStagingRepository { $setOnInsert: { roomId: event.room_id, createdAt: new Date(), + got: 0, }, $set: { event, @@ -43,9 +49,18 @@ export class EventStagingRepository { } getLeastDepthEventForRoom(roomId: string): Promise { - return this.collection.findOne( + return this.collection.findOneAndUpdate( { roomId }, - { sort: { 'event.depth': 1, createdAt: 1 } }, + { + $inc: { + got: 1, + }, + }, + { + sort: { got: 1, 'event.depth': 1, createdAt: 1 }, + upsert: false, + returnDocument: 'before', + }, ); } diff --git a/packages/federation-sdk/src/server-discovery/discovery.ts b/packages/federation-sdk/src/server-discovery/discovery.ts index 88d0987c..1d9ba147 100644 --- a/packages/federation-sdk/src/server-discovery/discovery.ts +++ b/packages/federation-sdk/src/server-discovery/discovery.ts @@ -37,7 +37,7 @@ const SERVER_DISCOVERY_CACHE_MAX_AGE = } throw new Error('Invalid SERVER_DISCOVERY_CACHE_MAX_AGE value'); - })(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 3_600_000; // default to 1 hour + })(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 300_000; // default to 5 minutes // should only be needed if input is from a dns server function fix6(addr: string): `[${string}]` { diff --git a/packages/federation-sdk/src/services/staging-area.service.ts b/packages/federation-sdk/src/services/staging-area.service.ts index 834d2419..4cde494f 100644 --- a/packages/federation-sdk/src/services/staging-area.service.ts +++ b/packages/federation-sdk/src/services/staging-area.service.ts @@ -26,6 +26,18 @@ import { FederationService } from './federation.service'; import { MissingEventService } from './missing-event.service'; import { PartialStateResolutionError, StateService } from './state.service'; +const MAX_EVENT_RETRY = + ((maxRetry?: string) => { + if (!maxRetry) return; + + const n = Number.parseInt(maxRetry, 10); + if (!Number.isNaN(n) && n >= 0) { + return n; + } + + throw new Error('Invalid MAX_EVENT_RETRY value'); + })(process.env.MAX_EVENT_RETRY) ?? 10; + class MissingAuthorizationEventsError extends Error { constructor(message: string) { super(message); @@ -62,16 +74,6 @@ export class StagingAreaService { } async processEventForRoom(roomId: string) { - let event = await this.eventService.getLeastDepthEventForRoom(roomId); - if (!event) { - this.logger.debug({ msg: 'No staged event found for room', roomId }); - await this.lockRepository.releaseLock( - roomId, - this.configService.instanceId, - ); - return; - } - const roomIdToRoomVersion = new Map(); const getRoomVersion = async (roomId: string) => { if (roomIdToRoomVersion.has(roomId)) { @@ -88,9 +90,32 @@ export class StagingAreaService { return PersistentEventFactory.createFromRawEvent(pdu, version); }; - while (event) { + let event: EventStagingStore | null = null; + + do { + event = await this.eventService.getLeastDepthEventForRoom(roomId); + if (!event) { + this.logger.debug({ msg: 'No staged event found for room', roomId }); + break; + } + + if (event.got > MAX_EVENT_RETRY) { + this.logger.warn( + `Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`, + ); + await this.eventService.markEventAsUnstaged(event); + continue; + } + this.logger.info({ msg: 'Processing event', eventId: event._id }); + // if we got an event, we need to update the lock's timestamp to avoid it being timed out + // and acquired by another instance while we're processing a batch of events for this room + await this.lockRepository.updateLockTimestamp( + roomId, + this.configService.instanceId, + ); + try { const addedMissing = await this.processDependencyStage(event); if (addedMissing) { @@ -123,33 +148,18 @@ export class StagingAreaService { }); } else if (err instanceof MissingEventsError) { this.logger.info({ - msg: 'Added missing events, postponing current event processing', + msg: 'Added missing events, postponing event processing', eventId: event._id, }); } else { this.logger.error({ - msg: 'Error processing event', + msg: 'Error processing event, postponing event processing', event, err, }); - - await this.eventService.markEventAsUnstaged(event); } } - - // TODO: what should we do to avoid infinite loops in case the next event is always the same event - - event = await this.eventService.getLeastDepthEventForRoom(roomId); - - // if we got an event, we need to update the lock's timestamp to avoid it being timed out - // and acquired by another instance while we're processing a batch of events for this room - if (event) { - await this.lockRepository.updateLockTimestamp( - roomId, - this.configService.instanceId, - ); - } - } + } while (event); // release the lock after processing await this.lockRepository.releaseLock(