Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,6 @@ export class EventStagingRepository {
return this.collection.deleteOne({ _id: eventId });
}

getNextStagedEventForRoom(roomId: string): Promise<EventStagingStore | null> {
return this.collection.findOne(
{
roomId,
},
{
sort: { createdAt: 1 },
},
);
}

async getDistinctStagedRooms(): Promise<string[]> {
return this.collection.distinct('roomId');
}
Expand Down
10 changes: 2 additions & 8 deletions packages/federation-sdk/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,6 @@ export class EventService {
return this.eventStagingRepository.getLeastDepthEventForRoom(roomId);
}

async getNextStagedEventForRoom(
roomId: string,
): Promise<EventStagingStore | null> {
return this.eventStagingRepository.getNextStagedEventForRoom(roomId);
}

/**
* Mark an event as no longer staged
*/
Expand Down Expand Up @@ -820,9 +814,9 @@ export class EventService {

/**
* A transaction containing the PDUs that preceded the given event(s), including the given event(s), up to the given limit.
*
*
* Note: Though the PDU definitions require that prev_events and auth_events be limited in number, the response of backfill MUST NOT be validated on these specific restrictions.
*
*
* Due to historical reasons, it is possible that events which were previously accepted would now be rejected by these limitations. The events should be rejected per usual by the /send, /get_missing_events, and remaining endpoints.

*/
Expand Down
9 changes: 6 additions & 3 deletions packages/federation-sdk/src/services/missing-event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ export class MissingEventService {
private readonly eventFetcherService: EventFetcherService,
) {}

async fetchMissingEvent(data: MissingEventType) {
async fetchMissingEvent(data: MissingEventType): Promise<boolean> {
const { eventId, roomId, origin } = data;

const exists = await this.eventService.getEventById(eventId);
if (exists) {
this.logger.debug(
`Event ${eventId} already exists in database (staged or processed), marking as fetched`,
);
return;
return true;
}

try {
Expand All @@ -43,7 +43,7 @@ export class MissingEventService {
this.logger.warn(
`Failed to fetch missing event ${eventId} from ${origin}`,
);
return;
return false;
}

for (const { event, eventId } of fetchedEvents.events) {
Expand All @@ -52,10 +52,13 @@ export class MissingEventService {
// TODO is there anything else we need to do with missing dependencies from received event?
await this.eventService.processIncomingPDUs(origin, [event]);
}

return true;
} catch (err: unknown) {
this.logger.error(
`Error fetching missing event ${eventId}: ${err instanceof Error ? err.message : String(err)}`,
);
return false;
}
}
}
30 changes: 18 additions & 12 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,11 @@ export class StagingAreaService {
} else {
this.logger.error({
msg: 'Error processing event',
event,
err,
});

await this.eventService.markEventAsUnstaged(event);
}
}

Expand Down Expand Up @@ -127,23 +130,26 @@ export class StagingAreaService {
return;
}
this.logger.debug(`Missing ${missing.length} events for ${eventId}`);
// trackedEvent.missingEvents = missing;

for (const missingId of missing) {
this.logger.debug(
`Adding missing event ${missingId} to missing events service`,
);
const found = await Promise.all(
missing.map((missingId) => {
this.logger.debug(
`Adding missing event ${missingId} to missing events service`,
);

await this.missingEventsService.fetchMissingEvent({
eventId: missingId,
roomId: event.event.room_id,
origin: event.origin,
});
}
return this.missingEventsService.fetchMissingEvent({
eventId: missingId,
roomId: event.event.room_id,
origin: event.origin,
});
}),
);

const addedMissing = found.some((f) => f === true);

// if the auth events are missing, the authorization stage will fail anyway,
// so to save some time we throw an error here, and the event processing will be postponed
if (authEvents.some((e) => missing.includes(e))) {
if (addedMissing && authEvents.some((e) => missing.includes(e))) {
throw new MissingAuthorizationEventsError('Missing authorization events');
}
}
Expand Down