From 5d7a4affbab4d0c9b61d65e9967e8db031f7ebb2 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Tue, 23 Sep 2025 22:04:22 -0300 Subject: [PATCH 1/6] fix: missing events not being processed properly --- packages/federation-sdk/src/services/event.service.ts | 2 +- packages/federation-sdk/src/services/missing-event.service.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index c73caa478..c36c58295 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -154,7 +154,7 @@ export class EventService { } } - private async processIncomingPDUs( + async processIncomingPDUs( origin: string, pdus: Pdu[], ): Promise { diff --git a/packages/federation-sdk/src/services/missing-event.service.ts b/packages/federation-sdk/src/services/missing-event.service.ts index 8be0709ef..64231a37b 100644 --- a/packages/federation-sdk/src/services/missing-event.service.ts +++ b/packages/federation-sdk/src/services/missing-event.service.ts @@ -50,7 +50,7 @@ export class MissingEventService { this.logger.debug(`Persisting fetched missing event ${eventId}`); // TODO is there anything else we need to do with missing dependencies from received event? - await this.stateService.persistEvent(event); + await this.eventService.processIncomingPDUs(origin, [event]); } } catch (err: unknown) { this.logger.error( From 08c726f0530a8e084af02732a06ac39280943a0f Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Tue, 23 Sep 2025 23:13:47 -0300 Subject: [PATCH 2/6] lint --- packages/federation-sdk/src/services/event.service.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index c36c58295..54ccca19d 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -154,10 +154,7 @@ export class EventService { } } - async processIncomingPDUs( - origin: string, - pdus: Pdu[], - ): Promise { + async processIncomingPDUs(origin: string, pdus: Pdu[]): Promise { // organize events by room id const eventsByRoomId = new Map(); for (const event of pdus) { From 25b4f076989fa22c287737fb6a55e0ba497a201c Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Wed, 24 Sep 2025 00:54:55 -0300 Subject: [PATCH 3/6] fix: DMs first message not working --- .../federation-sdk/src/services/missing-event.service.ts | 6 +++++- packages/federation-sdk/src/services/room.service.ts | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/federation-sdk/src/services/missing-event.service.ts b/packages/federation-sdk/src/services/missing-event.service.ts index 64231a37b..b5f1e71cb 100644 --- a/packages/federation-sdk/src/services/missing-event.service.ts +++ b/packages/federation-sdk/src/services/missing-event.service.ts @@ -23,7 +23,11 @@ export class MissingEventService { ) {} async fetchMissingEvent(data: MissingEventType) { - const { eventId, roomId, origin } = data; + const { eventId, roomId } = data; + + // TODO: check how to do this properly. This fixes the fist message of a DM not being fetched + // Get origin from roomId since the origin of the event may be self + const origin = roomId.split(':').pop() || data.origin; const exists = await this.eventService.getEventById(eventId); if (exists) { diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index 8f65ae639..399b18857 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -987,7 +987,7 @@ export class RoomService { ); // try to persist the join event now, should succeed with state in place - await stateService.persistStateEvent(joinEventFinal); + await this.eventService.processIncomingPDUs(joinEventFinal.origin, [joinEventFinal.event]); if (joinEventFinal.rejected) { throw new Error(joinEventFinal.rejectedReason); From a11afef409ea7b4f8532dcb5213f9b4f270d57a0 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Wed, 24 Sep 2025 11:52:07 -0300 Subject: [PATCH 4/6] Improve logic --- .../federation-sdk/src/services/event.service.ts | 12 +++++++++++- .../src/services/missing-event.service.ts | 6 +----- packages/federation-sdk/src/services/room.service.ts | 2 +- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 54ccca19d..80a8c8511 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -262,7 +262,17 @@ export class EventService { throw new Error('M_MISSING_SIGNATURES_OR_HASHES'); } - await checkSignAndHashes(event, origin, getPublicKeyFromServer); + let originToValidateSignatures = origin; + + // If the event does not have a signature for the origin server, + // but has a signature for our server, we validate using our server name. + // This happens on sendJoin process, where the join event is signed by our server, + // but the origin is the remote server since it just returns the event as we sent it. + if (!event.signatures[origin] && event.signatures[this.configService.serverName]) { + originToValidateSignatures = this.configService.serverName; + } + + await checkSignAndHashes(event, originToValidateSignatures, getPublicKeyFromServer); } private async processIncomingEDUs(edus: BaseEDU[]): Promise { diff --git a/packages/federation-sdk/src/services/missing-event.service.ts b/packages/federation-sdk/src/services/missing-event.service.ts index b5f1e71cb..64231a37b 100644 --- a/packages/federation-sdk/src/services/missing-event.service.ts +++ b/packages/federation-sdk/src/services/missing-event.service.ts @@ -23,11 +23,7 @@ export class MissingEventService { ) {} async fetchMissingEvent(data: MissingEventType) { - const { eventId, roomId } = data; - - // TODO: check how to do this properly. This fixes the fist message of a DM not being fetched - // Get origin from roomId since the origin of the event may be self - const origin = roomId.split(':').pop() || data.origin; + const { eventId, roomId, origin } = data; const exists = await this.eventService.getEventById(eventId); if (exists) { diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index 399b18857..02ca4135e 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -987,7 +987,7 @@ export class RoomService { ); // try to persist the join event now, should succeed with state in place - await this.eventService.processIncomingPDUs(joinEventFinal.origin, [joinEventFinal.event]); + await this.eventService.processIncomingPDUs(residentServer || joinEventFinal.origin, [joinEventFinal.event]); if (joinEventFinal.rejected) { throw new Error(joinEventFinal.rejectedReason); From 1f13062ca0456d3c71d4d065fc00c5b18f54ef71 Mon Sep 17 00:00:00 2001 From: Rodrigo Nascimento Date: Wed, 24 Sep 2025 12:13:36 -0300 Subject: [PATCH 5/6] Fixes lint --- .../federation-sdk/src/services/event.service.ts | 15 +++++++++++---- .../federation-sdk/src/services/room.service.ts | 5 ++++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 80a8c8511..0fe5cb757 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -263,16 +263,23 @@ export class EventService { } let originToValidateSignatures = origin; - + // If the event does not have a signature for the origin server, // but has a signature for our server, we validate using our server name. // This happens on sendJoin process, where the join event is signed by our server, // but the origin is the remote server since it just returns the event as we sent it. - if (!event.signatures[origin] && event.signatures[this.configService.serverName]) { + if ( + !event.signatures[origin] && + event.signatures[this.configService.serverName] + ) { originToValidateSignatures = this.configService.serverName; } - - await checkSignAndHashes(event, originToValidateSignatures, getPublicKeyFromServer); + + await checkSignAndHashes( + event, + originToValidateSignatures, + getPublicKeyFromServer, + ); } private async processIncomingEDUs(edus: BaseEDU[]): Promise { diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index 02ca4135e..222e87132 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -987,7 +987,10 @@ export class RoomService { ); // try to persist the join event now, should succeed with state in place - await this.eventService.processIncomingPDUs(residentServer || joinEventFinal.origin, [joinEventFinal.event]); + await this.eventService.processIncomingPDUs( + residentServer || joinEventFinal.origin, + [joinEventFinal.event], + ); if (joinEventFinal.rejected) { throw new Error(joinEventFinal.rejectedReason); From d03afd879651f5f0cad0a65fcb86c268d9306799 Mon Sep 17 00:00:00 2001 From: Guilherme Gazzo Date: Wed, 24 Sep 2025 15:02:06 -0300 Subject: [PATCH 6/6] chore: throw error for missing authorization events --- .../repositories/event-staging.repository.ts | 7 +++ .../src/services/event.service.ts | 6 +++ .../src/services/staging-area.service.ts | 45 +++++++++++++++---- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/packages/federation-sdk/src/repositories/event-staging.repository.ts b/packages/federation-sdk/src/repositories/event-staging.repository.ts index 93de3d0e9..e2d9168e1 100644 --- a/packages/federation-sdk/src/repositories/event-staging.repository.ts +++ b/packages/federation-sdk/src/repositories/event-staging.repository.ts @@ -40,6 +40,13 @@ export class EventStagingRepository { ); } + getLeastDepthEventForRoom(roomId: string): Promise { + return this.collection.findOne( + { roomId }, + { sort: { 'event.depth': 1, createdAt: 1 } }, + ); + } + removeByEventId(eventId: EventID): Promise { return this.collection.deleteOne({ _id: eventId }); } diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 0fe5cb757..552ccbb48 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -99,6 +99,12 @@ export class EventService { ); } + async getLeastDepthEventForRoom( + roomId: string, + ): Promise { + return this.eventStagingRepository.getLeastDepthEventForRoom(roomId); + } + async getNextStagedEventForRoom( roomId: string, ): Promise { diff --git a/packages/federation-sdk/src/services/staging-area.service.ts b/packages/federation-sdk/src/services/staging-area.service.ts index ffc6d1c70..0c166f987 100644 --- a/packages/federation-sdk/src/services/staging-area.service.ts +++ b/packages/federation-sdk/src/services/staging-area.service.ts @@ -21,6 +21,13 @@ import { ConfigService } from './config.service'; import { MissingEventService } from './missing-event.service'; import { StateService } from './state.service'; +class MissingAuthorizationEventsError extends Error { + constructor(message: string) { + super(message); + this.name = 'MissingAuthorizationEventsError'; + } +} + @singleton() export class StagingAreaService { private readonly logger = createLogger('StagingAreaService'); @@ -38,11 +45,11 @@ export class StagingAreaService { extractEventsFromIncomingPDU(pdu: EventBase) { const authEvents = pdu.auth_events || []; const prevEvents = pdu.prev_events || []; - return [...authEvents, ...prevEvents]; + return [authEvents, prevEvents]; } async processEventForRoom(roomId: string) { - let event = await this.eventService.getNextStagedEventForRoom(roomId); + let event = await this.eventService.getLeastDepthEventForRoom(roomId); if (!event) { this.logger.debug({ msg: 'No staged event found for room', roomId }); await this.lockRepository.releaseLock( @@ -65,13 +72,23 @@ export class StagingAreaService { // TODO add missing logic from synapse: Prune the event queue if it's getting large. } catch (err: unknown) { - this.logger.error({ - msg: 'Error processing event', - err, - }); + if (err instanceof MissingAuthorizationEventsError) { + this.logger.info({ + msg: 'Missing events, postponing event processing', + eventId: event._id, + err, + }); + } else { + this.logger.error({ + msg: 'Error processing event', + err, + }); + } } - event = await this.eventService.getNextStagedEventForRoom(roomId); + // TODO: what should we do to avoid infinite loops in case the next event is always the same event + + event = await this.eventService.getLeastDepthEventForRoom(roomId); // if we got an event, we need to update the lock's timestamp to avoid it being timed out // and acquired by another instance while we're processing a batch of events for this room @@ -93,7 +110,11 @@ export class StagingAreaService { private async processDependencyStage(event: EventStagingStore) { const eventId = event._id; - const eventIds = this.extractEventsFromIncomingPDU(event.event); + const [authEvents, prevEvents] = this.extractEventsFromIncomingPDU( + event.event, + ); + + const eventIds = [...authEvents, ...prevEvents]; this.logger.debug( `Checking dependencies for event ${eventId}: ${eventIds.length} references`, ); @@ -119,12 +140,18 @@ export class StagingAreaService { origin: event.origin, }); } + + // if the auth events are missing, the authorization stage will fail anyway, + // so to save some time we throw an error here, and the event processing will be postponed + if (authEvents.some(missing.includes)) { + throw new MissingAuthorizationEventsError('Missing authorization events'); + } } private async processAuthorizationStage(event: EventStagingStore) { this.logger.debug(`Authorizing event ${event._id}`); const authEvents = await this.eventService.getAuthEventIds( - 'm.room.message', + event.event.type, { roomId: event.event.room_id, senderId: event.event.sender }, );