diff --git a/packages/federation-sdk/src/listeners/staging-area.listener.ts b/packages/federation-sdk/src/listeners/staging-area.listener.ts index 49bf09bee..f3ac7519f 100644 --- a/packages/federation-sdk/src/listeners/staging-area.listener.ts +++ b/packages/federation-sdk/src/listeners/staging-area.listener.ts @@ -15,7 +15,7 @@ export class StagingAreaListener { } async handleQueueItem(data: string) { - this.logger.debug(`Processing event ${data}`); + this.logger.debug(`Processing room ${data}`); await this.stagingAreaService.processEventForRoom(data); } } diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index cb99a3333..0c2efd404 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -78,6 +78,7 @@ export class EventService { async checkIfEventsExists( eventIds: EventID[], ): Promise<{ missing: EventID[]; found: EventID[] }> { + // TODO, return only the IDs, not the full events const eventsCursor = this.eventRepository.findByIds(eventIds); const events = await eventsCursor.toArray(); diff --git a/packages/federation-sdk/src/services/federation.service.ts b/packages/federation-sdk/src/services/federation.service.ts index 7fed0c33f..f6f1a7b80 100644 --- a/packages/federation-sdk/src/services/federation.service.ts +++ b/packages/federation-sdk/src/services/federation.service.ts @@ -2,6 +2,7 @@ import type { EventBase } from '@rocket.chat/federation-core'; import type { BaseEDU } from '@rocket.chat/federation-core'; import { createLogger } from '@rocket.chat/federation-core'; import { + EventID, Pdu, PersistentEventBase, PersistentEventFactory, @@ -155,6 +156,31 @@ export class FederationService { } } + /** + * Get events from a remote server + */ + async getMissingEvents( + domain: string, + roomId: string, + earliestEvents: EventID[], + latestEvents: EventID[], + limit = 10, + minDepth = 0, + ): Promise<{ events: Pdu[] }> { + try { + const uri = FederationEndpoints.getMissingEvents(roomId); + return await this.requestService.post<{ events: Pdu[] }>(domain, uri, { + earliest_events: earliestEvents, + latest_events: latestEvents, + limit, + min_depth: minDepth, + }); + } catch (error: any) { + this.logger.error({ msg: 'getEvent failed', err: error }); + throw error; + } + } + /** * Get state for a room from remote server */ diff --git a/packages/federation-sdk/src/services/staging-area.service.ts b/packages/federation-sdk/src/services/staging-area.service.ts index 8e5bb7b50..834d24199 100644 --- a/packages/federation-sdk/src/services/staging-area.service.ts +++ b/packages/federation-sdk/src/services/staging-area.service.ts @@ -22,6 +22,7 @@ import { EventService } from './event.service'; import { LockRepository } from '../repositories/lock.repository'; import { ConfigService } from './config.service'; +import { FederationService } from './federation.service'; import { MissingEventService } from './missing-event.service'; import { PartialStateResolutionError, StateService } from './state.service'; @@ -32,6 +33,13 @@ class MissingAuthorizationEventsError extends Error { } } +class MissingEventsError extends Error { + constructor(message: string) { + super(message); + this.name = 'MissingEventsError'; + } +} + @singleton() export class StagingAreaService { private readonly logger = createLogger('StagingAreaService'); @@ -43,6 +51,7 @@ export class StagingAreaService { private readonly eventAuthService: EventAuthorizationService, private readonly eventEmitterService: EventEmitterService, private readonly stateService: StateService, + private readonly federationService: FederationService, private readonly lockRepository: LockRepository, ) {} @@ -83,7 +92,13 @@ export class StagingAreaService { this.logger.info({ msg: 'Processing event', eventId: event._id }); try { - await this.processDependencyStage(event); + const addedMissing = await this.processDependencyStage(event); + if (addedMissing) { + // if we added missing events, we postpone the processing of this event + // to give time for the missing events to be processed first + throw new MissingEventsError('Added missing events'); + } + if ('from' in event && event.from !== 'join') { await this.processAuthorizationStage(event); } @@ -106,6 +121,11 @@ export class StagingAreaService { eventId: event._id, err, }); + } else if (err instanceof MissingEventsError) { + this.logger.info({ + msg: 'Added missing events, postponing current event processing', + eventId: event._id, + }); } else { this.logger.error({ msg: 'Error processing event', @@ -155,31 +175,67 @@ export class StagingAreaService { ); if (missing.length === 0) { - return; + return false; } - this.logger.debug(`Missing ${missing.length} events for ${eventId}`); - - const found = await Promise.all( - missing.map((missingId) => { - this.logger.debug( - `Adding missing event ${missingId} to missing events service`, - ); + this.logger.debug( + `Missing ${missing.length} events for ${eventId}: ${missing}`, + ); - return this.missingEventsService.fetchMissingEvent({ - eventId: missingId, - roomId: event.event.room_id, - origin: event.origin, - }); - }), + const latestEvent = await this.eventService.getLastEventForRoom( + event.event.room_id, ); - const addedMissing = found.some((f) => f === true); + let addedMissing = false; + + if (latestEvent) { + this.logger.debug( + `Fetching missing events between ${latestEvent._id} and ${eventId} for room ${event.event.room_id}`, + ); + + const missingEvents = await this.federationService.getMissingEvents( + event.origin, + event.event.room_id, + [latestEvent._id], + [eventId], + 10, + 0, + ); + + this.logger.debug( + `Persisting ${missingEvents.events.length} fetched missing events`, + ); + + await this.eventService.processIncomingPDUs( + event.origin, + missingEvents.events, + ); + + addedMissing = missingEvents.events.length > 0; + } else { + const found = await Promise.all( + missing.map((missingId) => { + this.logger.debug( + `Adding missing event ${missingId} to missing events service`, + ); + + return this.missingEventsService.fetchMissingEvent({ + eventId: missingId, + roomId: event.event.room_id, + origin: event.origin, + }); + }), + ); + + addedMissing = found.some((f) => f === true); + } // if the auth events are missing, the authorization stage will fail anyway, // so to save some time we throw an error here, and the event processing will be postponed if (addedMissing && authEvents.some((e) => missing.includes(e))) { throw new MissingAuthorizationEventsError('Missing authorization events'); } + + return addedMissing; } private async processAuthorizationStage(event: EventStagingStore) { diff --git a/packages/federation-sdk/src/services/state.service.ts b/packages/federation-sdk/src/services/state.service.ts index 15c498180..faf550f15 100644 --- a/packages/federation-sdk/src/services/state.service.ts +++ b/packages/federation-sdk/src/services/state.service.ts @@ -645,7 +645,7 @@ export class StateService { 'event authorized against auth events', ); - // 5. Passes authorization rules based on the state before the event, otherwise it is rejected. + // 5. Passes authorization rules based on the state before the event and store event, otherwise it is rejected. await this._resolveStateAtEvent(event); // it is the assumption that this point forwards this event WILL have a state associated with it /*