Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/models/event.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export interface EventStore<E = Pdu> extends PersistentEventBase<E> {

export interface EventStagingStore extends PersistentEventBase {
roomId: string;
got: number;
from: 'join' | 'transaction';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@ export class EventStagingRepository {
@inject('EventStagingCollection')
private readonly collection: Collection<EventStagingStore>,
) {
this.collection.createIndex({ roomId: 1, createdAt: 1 });
this.collection.createIndex({
roomId: 1,
got: 1,
'event.depth': 1,
createdAt: 1,
});
}

async create(
Expand All @@ -30,6 +35,7 @@ export class EventStagingRepository {
$setOnInsert: {
roomId: event.room_id,
createdAt: new Date(),
got: 0,
},
$set: {
event,
Expand All @@ -43,9 +49,18 @@ export class EventStagingRepository {
}

getLeastDepthEventForRoom(roomId: string): Promise<EventStagingStore | null> {
return this.collection.findOne(
return this.collection.findOneAndUpdate(
{ roomId },
{ sort: { 'event.depth': 1, createdAt: 1 } },
{
$inc: {
got: 1,
},
},
{
sort: { got: 1, 'event.depth': 1, createdAt: 1 },
upsert: false,
returnDocument: 'before',
},
);
}

Expand Down
2 changes: 1 addition & 1 deletion packages/federation-sdk/src/server-discovery/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const SERVER_DISCOVERY_CACHE_MAX_AGE =
}

throw new Error('Invalid SERVER_DISCOVERY_CACHE_MAX_AGE value');
})(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 3_600_000; // default to 1 hour
})(process.env.SERVER_DISCOVERY_CACHE_MAX_AGE) ?? 300_000; // default to 5 minutes

// should only be needed if input is from a dns server
function fix6(addr: string): `[${string}]` {
Expand Down
68 changes: 39 additions & 29 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import { FederationService } from './federation.service';
import { MissingEventService } from './missing-event.service';
import { PartialStateResolutionError, StateService } from './state.service';

const MAX_EVENT_RETRY =
((maxRetry?: string) => {
if (!maxRetry) return;

const n = Number.parseInt(maxRetry, 10);
if (!Number.isNaN(n) && n >= 0) {
return n;
}

throw new Error('Invalid MAX_EVENT_RETRY value');
})(process.env.MAX_EVENT_RETRY) ?? 10;

class MissingAuthorizationEventsError extends Error {
constructor(message: string) {
super(message);
Expand Down Expand Up @@ -62,16 +74,6 @@ export class StagingAreaService {
}

async processEventForRoom(roomId: string) {
let event = await this.eventService.getLeastDepthEventForRoom(roomId);
if (!event) {
this.logger.debug({ msg: 'No staged event found for room', roomId });
await this.lockRepository.releaseLock(
roomId,
this.configService.instanceId,
);
return;
}

const roomIdToRoomVersion = new Map<string, RoomVersion>();
const getRoomVersion = async (roomId: string) => {
if (roomIdToRoomVersion.has(roomId)) {
Expand All @@ -88,9 +90,32 @@ export class StagingAreaService {
return PersistentEventFactory.createFromRawEvent(pdu, version);
};

while (event) {
let event: EventStagingStore | null = null;

do {
event = await this.eventService.getLeastDepthEventForRoom(roomId);
if (!event) {
this.logger.debug({ msg: 'No staged event found for room', roomId });
break;
}

if (event.got > MAX_EVENT_RETRY) {
this.logger.warn(
`Event ${event._id} has been tried ${MAX_EVENT_RETRY} times, removing from staging area`,
);
await this.eventService.markEventAsUnstaged(event);
continue;
}

this.logger.info({ msg: 'Processing event', eventId: event._id });

// if we got an event, we need to update the lock's timestamp to avoid it being timed out
// and acquired by another instance while we're processing a batch of events for this room
await this.lockRepository.updateLockTimestamp(
roomId,
this.configService.instanceId,
);

try {
const addedMissing = await this.processDependencyStage(event);
if (addedMissing) {
Expand Down Expand Up @@ -123,33 +148,18 @@ export class StagingAreaService {
});
} else if (err instanceof MissingEventsError) {
this.logger.info({
msg: 'Added missing events, postponing current event processing',
msg: 'Added missing events, postponing event processing',
eventId: event._id,
});
} else {
this.logger.error({
msg: 'Error processing event',
msg: 'Error processing event, postponing event processing',
event,
err,
});

await this.eventService.markEventAsUnstaged(event);
}
}

// TODO: what should we do to avoid infinite loops in case the next event is always the same event

event = await this.eventService.getLeastDepthEventForRoom(roomId);

// if we got an event, we need to update the lock's timestamp to avoid it being timed out
// and acquired by another instance while we're processing a batch of events for this room
if (event) {
await this.lockRepository.updateLockTimestamp(
roomId,
this.configService.instanceId,
);
}
}
} while (event);

// release the lock after processing
await this.lockRepository.releaseLock(
Expand Down