Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ export class EventStagingRepository {
);
}

getLeastDepthEventForRoom(roomId: string): Promise<EventStagingStore | null> {
return this.collection.findOne(
{ roomId },
{ sort: { 'event.depth': 1, createdAt: 1 } },
);
}

removeByEventId(eventId: EventID): Promise<DeleteResult> {
return this.collection.deleteOne({ _id: eventId });
}
Expand Down
30 changes: 25 additions & 5 deletions packages/federation-sdk/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ export class EventService {
);
}

async getLeastDepthEventForRoom(
roomId: string,
): Promise<EventStagingStore | null> {
return this.eventStagingRepository.getLeastDepthEventForRoom(roomId);
}

async getNextStagedEventForRoom(
roomId: string,
): Promise<EventStagingStore | null> {
Expand Down Expand Up @@ -154,10 +160,7 @@ export class EventService {
}
}

private async processIncomingPDUs(
origin: string,
pdus: Pdu[],
): Promise<void> {
async processIncomingPDUs(origin: string, pdus: Pdu[]): Promise<void> {
// organize events by room id
const eventsByRoomId = new Map<string, Pdu[]>();
for (const event of pdus) {
Expand Down Expand Up @@ -265,7 +268,24 @@ export class EventService {
throw new Error('M_MISSING_SIGNATURES_OR_HASHES');
}

await checkSignAndHashes(event, origin, getPublicKeyFromServer);
let originToValidateSignatures = origin;

// If the event does not have a signature for the origin server,
// but has a signature for our server, we validate using our server name.
// This happens on sendJoin process, where the join event is signed by our server,
// but the origin is the remote server since it just returns the event as we sent it.
if (
!event.signatures[origin] &&
event.signatures[this.configService.serverName]
) {
originToValidateSignatures = this.configService.serverName;
}

await checkSignAndHashes(
event,
originToValidateSignatures,
getPublicKeyFromServer,
);
}

private async processIncomingEDUs(edus: BaseEDU[]): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class MissingEventService {
this.logger.debug(`Persisting fetched missing event ${eventId}`);

// TODO is there anything else we need to do with missing dependencies from received event?
await this.stateService.persistEvent(event);
await this.eventService.processIncomingPDUs(origin, [event]);
}
} catch (err: unknown) {
this.logger.error(
Expand Down
5 changes: 4 additions & 1 deletion packages/federation-sdk/src/services/room.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,10 @@ export class RoomService {
);

// try to persist the join event now, should succeed with state in place
await stateService.persistStateEvent(joinEventFinal);
await this.eventService.processIncomingPDUs(
residentServer || joinEventFinal.origin,
[joinEventFinal.event],
);

if (joinEventFinal.rejected) {
throw new Error(joinEventFinal.rejectedReason);
Expand Down
45 changes: 36 additions & 9 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ import { ConfigService } from './config.service';
import { MissingEventService } from './missing-event.service';
import { StateService } from './state.service';

class MissingAuthorizationEventsError extends Error {
constructor(message: string) {
super(message);
this.name = 'MissingAuthorizationEventsError';
}
}

@singleton()
export class StagingAreaService {
private readonly logger = createLogger('StagingAreaService');
Expand All @@ -38,11 +45,11 @@ export class StagingAreaService {
extractEventsFromIncomingPDU(pdu: EventBase) {
const authEvents = pdu.auth_events || [];
const prevEvents = pdu.prev_events || [];
return [...authEvents, ...prevEvents];
return [authEvents, prevEvents];
}

async processEventForRoom(roomId: string) {
let event = await this.eventService.getNextStagedEventForRoom(roomId);
let event = await this.eventService.getLeastDepthEventForRoom(roomId);
if (!event) {
this.logger.debug({ msg: 'No staged event found for room', roomId });
await this.lockRepository.releaseLock(
Expand All @@ -65,13 +72,23 @@ export class StagingAreaService {

// TODO add missing logic from synapse: Prune the event queue if it's getting large.
} catch (err: unknown) {
this.logger.error({
msg: 'Error processing event',
err,
});
if (err instanceof MissingAuthorizationEventsError) {
this.logger.info({
msg: 'Missing events, postponing event processing',
eventId: event._id,
err,
});
} else {
this.logger.error({
msg: 'Error processing event',
err,
});
}
}

event = await this.eventService.getNextStagedEventForRoom(roomId);
// TODO: what should we do to avoid infinite loops in case the next event is always the same event

Comment on lines +89 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: Address the TODO about infinite loops.

The TODO comment raises a valid concern about potential infinite loops if the same event keeps being returned. This could happen if an event permanently lacks its authorization events.

Consider implementing one of these strategies:

  1. Track retry attempts per event and skip after a threshold
  2. Implement exponential backoff for events that repeatedly fail
  3. Move permanently failing events to a dead letter queue

Would you like me to open an issue to track implementing a retry limit mechanism?

event = await this.eventService.getLeastDepthEventForRoom(roomId);

// if we got an event, we need to update the lock's timestamp to avoid it being timed out
// and acquired by another instance while we're processing a batch of events for this room
Expand All @@ -93,7 +110,11 @@ export class StagingAreaService {
private async processDependencyStage(event: EventStagingStore) {
const eventId = event._id;

const eventIds = this.extractEventsFromIncomingPDU(event.event);
const [authEvents, prevEvents] = this.extractEventsFromIncomingPDU(
event.event,
);

const eventIds = [...authEvents, ...prevEvents];
this.logger.debug(
`Checking dependencies for event ${eventId}: ${eventIds.length} references`,
);
Expand All @@ -119,12 +140,18 @@ export class StagingAreaService {
origin: event.origin,
});
}

// if the auth events are missing, the authorization stage will fail anyway,
// so to save some time we throw an error here, and the event processing will be postponed
if (authEvents.some(missing.includes)) {
throw new MissingAuthorizationEventsError('Missing authorization events');
}
}

private async processAuthorizationStage(event: EventStagingStore) {
this.logger.debug(`Authorizing event ${event._id}`);
const authEvents = await this.eventService.getAuthEventIds(
'm.room.message',
event.event.type,
{ roomId: event.event.room_id, senderId: event.event.sender },
);

Expand Down