Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export class StagingAreaListener {
}

async handleQueueItem(data: string) {
this.logger.debug(`Processing event ${data}`);
this.logger.debug(`Processing room ${data}`);
await this.stagingAreaService.processEventForRoom(data);
}
}
1 change: 1 addition & 0 deletions packages/federation-sdk/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class EventService {
async checkIfEventsExists(
eventIds: EventID[],
): Promise<{ missing: EventID[]; found: EventID[] }> {
// TODO, return only the IDs, not the full events
const eventsCursor = this.eventRepository.findByIds(eventIds);
const events = await eventsCursor.toArray();

Expand Down
26 changes: 26 additions & 0 deletions packages/federation-sdk/src/services/federation.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { EventBase } from '@rocket.chat/federation-core';
import type { BaseEDU } from '@rocket.chat/federation-core';
import { createLogger } from '@rocket.chat/federation-core';
import {
EventID,
Pdu,
PersistentEventBase,
PersistentEventFactory,
Expand Down Expand Up @@ -155,6 +156,31 @@ export class FederationService {
}
}

/**
* Get events from a remote server
*/
async getMissingEvents(
domain: string,
roomId: string,
earliestEvents: EventID[],
latestEvents: EventID[],
limit = 10,
minDepth = 0,
): Promise<{ events: Pdu[] }> {
try {
const uri = FederationEndpoints.getMissingEvents(roomId);
return await this.requestService.post<{ events: Pdu[] }>(domain, uri, {
earliest_events: earliestEvents,
latest_events: latestEvents,
limit,
min_depth: minDepth,
});
} catch (error: any) {
this.logger.error({ msg: 'getEvent failed', err: error });
throw error;
}
}

/**
* Get state for a room from remote server
*/
Expand Down
88 changes: 72 additions & 16 deletions packages/federation-sdk/src/services/staging-area.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { EventService } from './event.service';

import { LockRepository } from '../repositories/lock.repository';
import { ConfigService } from './config.service';
import { FederationService } from './federation.service';
import { MissingEventService } from './missing-event.service';
import { PartialStateResolutionError, StateService } from './state.service';

Expand All @@ -32,6 +33,13 @@ class MissingAuthorizationEventsError extends Error {
}
}

class MissingEventsError extends Error {
constructor(message: string) {
super(message);
this.name = 'MissingEventsError';
}
}

@singleton()
export class StagingAreaService {
private readonly logger = createLogger('StagingAreaService');
Expand All @@ -43,6 +51,7 @@ export class StagingAreaService {
private readonly eventAuthService: EventAuthorizationService,
private readonly eventEmitterService: EventEmitterService,
private readonly stateService: StateService,
private readonly federationService: FederationService,
private readonly lockRepository: LockRepository,
) {}

Expand Down Expand Up @@ -83,7 +92,13 @@ export class StagingAreaService {
this.logger.info({ msg: 'Processing event', eventId: event._id });

try {
await this.processDependencyStage(event);
const addedMissing = await this.processDependencyStage(event);
if (addedMissing) {
// if we added missing events, we postpone the processing of this event
// to give time for the missing events to be processed first
throw new MissingEventsError('Added missing events');
}

if ('from' in event && event.from !== 'join') {
await this.processAuthorizationStage(event);
}
Expand All @@ -106,6 +121,11 @@ export class StagingAreaService {
eventId: event._id,
err,
});
} else if (err instanceof MissingEventsError) {
this.logger.info({
msg: 'Added missing events, postponing current event processing',
eventId: event._id,
});
} else {
this.logger.error({
msg: 'Error processing event',
Expand Down Expand Up @@ -155,31 +175,67 @@ export class StagingAreaService {
);

if (missing.length === 0) {
return;
return false;
}
this.logger.debug(`Missing ${missing.length} events for ${eventId}`);

const found = await Promise.all(
missing.map((missingId) => {
this.logger.debug(
`Adding missing event ${missingId} to missing events service`,
);
this.logger.debug(
`Missing ${missing.length} events for ${eventId}: ${missing}`,
);

return this.missingEventsService.fetchMissingEvent({
eventId: missingId,
roomId: event.event.room_id,
origin: event.origin,
});
}),
const latestEvent = await this.eventService.getLastEventForRoom(
event.event.room_id,
);

const addedMissing = found.some((f) => f === true);
let addedMissing = false;

if (latestEvent) {
this.logger.debug(
`Fetching missing events between ${latestEvent._id} and ${eventId} for room ${event.event.room_id}`,
);

const missingEvents = await this.federationService.getMissingEvents(
event.origin,
event.event.room_id,
[latestEvent._id],
[eventId],
10,
0,
);

this.logger.debug(
`Persisting ${missingEvents.events.length} fetched missing events`,
);

await this.eventService.processIncomingPDUs(
event.origin,
missingEvents.events,
);

addedMissing = missingEvents.events.length > 0;
Comment on lines +195 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Handle remote fetch failures without dropping the staged event.

If federationService.getMissingEvents throws (network error, 4xx/5xx, etc.), the exception bubbles up to processEventForRoom, falls into the generic else branch, logs an error, and the event is un-staged via eventService.markEventAsUnstaged(event). That means we permanently discard the event just because fetching its dependencies failed once.

We should avoid unstaging in this case—either catch the error here and return false (so processing is retried later) or rethrow a dedicated error handled like the other postponement cases.

🤖 Prompt for AI Agents
In packages/federation-sdk/src/services/staging-area.service.ts around lines 195
to 213, the call to this.federationService.getMissingEvents can throw and
currently bubbles up causing the event to be un-staged; wrap the
getMissingEvents call (and subsequent processing of fetched events) in a
try/catch, and on any error catch it, log a debug/error message including the
error, and return false so the staged event is left in place for retry instead
of calling eventService.markEventAsUnstaged(event); ensure normal success path
still processes and marks addedMissing as before.

} else {
Comment on lines +208 to +214
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Prevent re-staging the current event via processIncomingPDUs.

When /get_missing_events includes the event we’re currently processing (allowed by spec), processIncomingPDUs does not find it in the persistent store and stages it again. That creates a duplicate entry for the same event and can cause an infinite loop. Before calling processIncomingPDUs, filter out the current eventId (and any other events we already staged) so we only enqueue genuinely missing ones.

🤖 Prompt for AI Agents
packages/federation-sdk/src/services/staging-area.service.ts lines ~208-214:
before calling this.eventService.processIncomingPDUs(event.origin,
missingEvents.events) filter missingEvents.events to remove the current eventId
(the event we are processing) and any event IDs already staged/persisted so we
only pass genuinely missing events; then call processIncomingPDUs with the
filtered list and compute addedMissing from the filtered length. Ensure you
check the actual event id field name used in this context and use the
staging/persistent store lookup to exclude already-staged IDs.

const found = await Promise.all(
missing.map((missingId) => {
this.logger.debug(
`Adding missing event ${missingId} to missing events service`,
);

return this.missingEventsService.fetchMissingEvent({
eventId: missingId,
roomId: event.event.room_id,
origin: event.origin,
});
}),
);

addedMissing = found.some((f) => f === true);
}

// if the auth events are missing, the authorization stage will fail anyway,
// so to save some time we throw an error here, and the event processing will be postponed
if (addedMissing && authEvents.some((e) => missing.includes(e))) {
throw new MissingAuthorizationEventsError('Missing authorization events');
}

return addedMissing;
}

private async processAuthorizationStage(event: EventStagingStore) {
Expand Down
2 changes: 1 addition & 1 deletion packages/federation-sdk/src/services/state.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ export class StateService {
'event authorized against auth events',
);

// 5. Passes authorization rules based on the state before the event, otherwise it is rejected.
// 5. Passes authorization rules based on the state before the event and store event, otherwise it is rejected.
await this._resolveStateAtEvent(event); // it is the assumption that this point forwards this event WILL have a state associated with it

/*
Expand Down