Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/src/models/event.model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ export interface EventStore<E = Pdu> extends PersistentEventBase<E> {

export interface EventStagingStore extends PersistentEventBase {
roomId: string;

pendingInvite: boolean;
}
Comment on lines +29 to 31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Backfill or tolerant query needed for new required field.

Adding pendingInvite: boolean makes legacy staged docs (without this field) invisible to queries that match pendingInvite: false. Ensure downstream queries treat missing as false or run a one-off backfill/migration.

I’ve proposed tolerant query changes under event-staging.repository.ts.

🤖 Prompt for AI Agents
In packages/core/src/models/event.model.ts around lines 29 to 31, adding the new
required field `pendingInvite: boolean` will cause legacy documents that lack
this property to be excluded by queries that expect `pendingInvite: false`; to
fix, make the model tolerate missing values and/or backfill existing docs:
update the TypeScript model to allow the field to be optional (or provide a
default), add tolerant query logic in repositories (e.g., treat missing as false
via an `$or`/`$exists` check or include both `false` and `null`/missing in
filters), and run a one‑time migration to set `pendingInvite: false` on staged
documents so future queries can be simple and non-tolerant.


export interface FetchedEvents {
Expand Down
11 changes: 11 additions & 0 deletions packages/federation-sdk/src/container.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import {
MatrixBridgedRoom,
MatrixBridgedRoomRepository,
} from './repositories/matrix-bridged-room.repository';
import {
PendingInvite,
PendingInviteRepository,
} from './repositories/pending-invite.repository';
import { Room, RoomRepository } from './repositories/room.repository';
import { Server, ServerRepository } from './repositories/server.repository';
import { StateRepository, StateStore } from './repositories/state.repository';
Expand Down Expand Up @@ -104,6 +108,12 @@ export async function createFederationContainer(
},
);

container.register<Collection<PendingInvite>>('PendingInviteCollection', {
useValue: db.collection<PendingInvite>(
'rocketchat_federation_pending_invites',
),
});

container.registerSingleton(EventRepository);
container.registerSingleton(EventStagingRepository);
container.registerSingleton(KeyRepository);
Expand All @@ -117,6 +127,7 @@ export async function createFederationContainer(
container.registerSingleton(FederationRequestService);
container.registerSingleton(FederationService);
container.registerSingleton(StateService);
container.registerSingleton(PendingInviteRepository);
container.registerSingleton(EventService);
container.registerSingleton(EventFetcherService);
container.registerSingleton(EventAuthorizationService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export class EventStagingRepository {
eventId: EventID,
origin: string,
event: Pdu,
pendingInvite = false,
): Promise<UpdateResult> {
// We use an upsert here to handle the case where we see the same event
// from the same server multiple times.
Expand All @@ -32,6 +33,7 @@ export class EventStagingRepository {
},
$set: {
event,
pendingInvite,
},
},
{
Expand All @@ -48,6 +50,7 @@ export class EventStagingRepository {
return this.collection.findOne(
{
roomId,
pendingInvite: false,
},
{
sort: { createdAt: 1 },
Expand All @@ -58,4 +61,11 @@ export class EventStagingRepository {
async getDistinctStagedRooms(): Promise<string[]> {
return this.collection.distinct('roomId');
}

async unmarkInvitePending(eventId: EventID): Promise<UpdateResult> {
return this.collection.updateOne(
{ _id: eventId },
{ $set: { pendingInvite: false } },
);
}
Comment on lines +65 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Unmarking by eventId is insufficient; add a bulk clear by room+sender.

Multiple PDUs may be flagged. Provide an updateMany helper.

   async unmarkInvitePending(eventId: EventID): Promise<UpdateResult> {
     return this.collection.updateOne(
       { _id: eventId },
       { $set: { pendingInvite: false } },
     );
   }
+
+  async unmarkInvitePendingForRoomAndSender(
+    roomId: string,
+    sender: string,
+  ): Promise<UpdateResult> {
+    return this.collection.updateMany(
+      { roomId, 'event.sender': sender, pendingInvite: true },
+      { $set: { pendingInvite: false } },
+    );
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async unmarkInvitePending(eventId: EventID): Promise<UpdateResult> {
return this.collection.updateOne(
{ _id: eventId },
{ $set: { pendingInvite: false } },
);
}
async unmarkInvitePending(eventId: EventID): Promise<UpdateResult> {
return this.collection.updateOne(
{ _id: eventId },
{ $set: { pendingInvite: false } },
);
}
async unmarkInvitePendingForRoomAndSender(
roomId: string,
sender: string,
): Promise<UpdateResult> {
return this.collection.updateMany(
{ roomId, 'event.sender': sender, pendingInvite: true },
{ $set: { pendingInvite: false } },
);
}
🤖 Prompt for AI Agents
In packages/federation-sdk/src/repositories/event-staging.repository.ts around
lines 65 to 70, the current unmarkInvitePending method only updates a single
document by eventId which misses other PDUs flagged for the same invite; add a
new public method (e.g., unmarkInvitePendingByRoomAndSender or
unmarkPendingInvites) that calls collection.updateMany with a filter for roomId
and sender (and any other identifying fields like stateKey if applicable) and
sets pendingInvite to false, return the UpdateResult; keep existing updateOne
method if needed but use updateMany to clear all matching PDUs.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { EventID, Pdu } from '@hs/room';
import { Collection } from 'mongodb';
import { inject, singleton } from 'tsyringe';

export type PendingInvite = {
event: Pdu;
_id: EventID;
createdAt: Date;
};

@singleton()
export class PendingInviteRepository {
constructor(
@inject('PendingInviteCollection')
private readonly collection: Collection<PendingInvite>,
) {}

async add(eventId: EventID, event: Pdu): Promise<void> {
await this.collection.insertOne({
_id: eventId,
event,
createdAt: new Date(),
});
}
Comment on lines +18 to +24
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make add idempotent.

Use upsert to tolerate retries.

-  async add(eventId: EventID, event: Pdu): Promise<void> {
-    await this.collection.insertOne({
-      _id: eventId,
-      event,
-      createdAt: new Date(),
-    });
-  }
+  async add(eventId: EventID, event: Pdu): Promise<void> {
+    await this.collection.updateOne(
+      { _id: eventId },
+      { $setOnInsert: { _id: eventId, event, createdAt: new Date() } },
+      { upsert: true },
+    );
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async add(eventId: EventID, event: Pdu): Promise<void> {
await this.collection.insertOne({
_id: eventId,
event,
createdAt: new Date(),
});
}
async add(eventId: EventID, event: Pdu): Promise<void> {
await this.collection.updateOne(
{ _id: eventId },
{ $setOnInsert: { _id: eventId, event, createdAt: new Date() } },
{ upsert: true },
);
}
🤖 Prompt for AI Agents
In packages/federation-sdk/src/repositories/pending-invite.repository.ts around
lines 18 to 24, the add method currently does an insertOne which will fail on
retries; make it idempotent by replacing insertOne with an updateOne (or
replace/upsert) using upsert: true and $setOnInsert for the event and createdAt
so repeated calls don’t error or overwrite existing records (e.g.,
updateOne({_id: eventId}, {$setOnInsert: {event, createdAt: new Date()}},
{upsert: true})).


async findByUserIdAndRoomId(
userId: string,
roomId: string,
): Promise<PendingInvite | null> {
return this.collection.findOne({
'event.type': 'm.room.member',
'event.state_key': userId,
'event.room_id': roomId,
});
}

async remove(eventId: EventID): Promise<void> {
await this.collection.deleteOne({ _id: eventId });
}
}
53 changes: 52 additions & 1 deletion packages/federation-sdk/src/services/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type Pdu,
type PduForType,
type PduType,
PersistentEventBase,
PersistentEventFactory,
getAuthChain,
} from '@hs/room';
Expand All @@ -32,6 +33,7 @@ import { EventStagingRepository } from '../repositories/event-staging.repository
import { EventRepository } from '../repositories/event.repository';
import { KeyRepository } from '../repositories/key.repository';
import { LockRepository } from '../repositories/lock.repository';
import { PendingInviteRepository } from '../repositories/pending-invite.repository';
import { eventSchemas } from '../utils/event-schemas';
import { ConfigService } from './config.service';
import { EventEmitterService } from './event-emitter.service';
Expand Down Expand Up @@ -59,6 +61,8 @@ export class EventService {
private readonly stateService: StateService,

private readonly eventEmitterService: EventEmitterService,

private readonly pendingInviteRepository: PendingInviteRepository,
) {
// on startup we look for old staged events and try to process them
setTimeout(() => {
Expand Down Expand Up @@ -196,8 +200,22 @@ export class EventService {
continue;
}

const pendingInvite = await this.isSenderInvitePending(
event.sender,
event.room_id,
);

// save the event as staged to be processed
await this.eventStagingRepository.create(eventId, origin, event);
await this.eventStagingRepository.create(
eventId,
origin,
event,
pendingInvite,
);

if (pendingInvite) {
continue;
}

Comment on lines +203 to 219
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Pending-invite short-circuit: confirm join events aren’t blocked.

This will stage and skip any PDU from a sender with a pending invite. Ensure membership “join” events aren’t accidentally deferred, or they may never clear the block.

Run a quick scan to see if join PDUs enter this path:


🏁 Script executed:

#!/bin/bash
rg -nP --type=ts -C3 "processIncomingPDUs|persistStateEvent\(.+m\.room\.member.+join" packages

Length of output: 1134


🏁 Script executed:

#!/bin/bash
rg -n -C30 "isSenderInvitePending" packages/federation-sdk/src/services/event.service.ts

Length of output: 2991


Add join-event exemption to pending-invite short-circuit
In processIncomingPDUs (packages/federation-sdk/src/services/event.service.ts lines 203–219), you currently skip all PDUs when pendingInvite is true. That’ll also defer membership-“join” events and prevent removePendingInvite from ever running. Wrap the continue in a check so joins bypass the block, e.g.:

const isJoin = event.type === 'm.room.member'
  && (event.content as any).membership === 'join';
if (pendingInvite && !isJoin) {
  continue;
}

This ensures join events are processed immediately.

🤖 Prompt for AI Agents
In packages/federation-sdk/src/services/event.service.ts around lines 203-219,
the code short-circuits all PDUs when pendingInvite is true which also skips
processing of membership "join" events and prevents removePendingInvite from
running; modify the conditional so that you only continue when pendingInvite is
true AND the event is not a membership join — i.e., detect isJoin by checking
event.type === 'm.room.member' and event.content.membership === 'join' (or
equivalent) and only call continue when pendingInvite && !isJoin so join events
bypass the short-circuit and are processed immediately.

// acquire a lock for processing the event
const lock = await this.lockRepository.getLock(
Expand Down Expand Up @@ -806,4 +824,37 @@ export class EventService {
throw error;
}
}

async addPendingInvite(event: PersistentEventBase): Promise<void> {
await this.pendingInviteRepository.add(event.eventId, event.event as Pdu);
}

async removePendingInvite(eventId: EventID, roomId: string): Promise<void> {
await Promise.all([
this.pendingInviteRepository.remove(eventId),
this.eventStagingRepository.unmarkInvitePending(eventId),
]);

// acquire a lock for processing the event
const lock = await this.lockRepository.getLock(
roomId,
this.configService.instanceId,
);
if (!lock) {
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
return;
}

// TODO change this to call stagingAreaService directly
this.stagingAreaQueue.enqueue(roomId);
}
Comment on lines +832 to +850
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix: remove by user+room and unmark all staged events.

Current implementation removes by eventId and unmarks a single staged doc, which doesn’t map to how invites and staged PDUs are keyed.

-  async removePendingInvite(eventId: EventID, roomId: string): Promise<void> {
-    await Promise.all([
-      this.pendingInviteRepository.remove(eventId),
-      this.eventStagingRepository.unmarkInvitePending(eventId),
-    ]);
-
-    // acquire a lock for processing the event
-    const lock = await this.lockRepository.getLock(
-      roomId,
-      this.configService.instanceId,
-    );
-    if (!lock) {
-      this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
-      return;
-    }
-
-    // TODO change this to call stagingAreaService directly
-    this.stagingAreaQueue.enqueue(roomId);
-  }
+  async resolvePendingInvitesForUser(roomId: string, userId: string): Promise<void> {
+    // Remove the invite entry (by user+room) if present
+    const invite = await this.pendingInviteRepository.findByUserIdAndRoomId(userId, roomId);
+    if (invite?._id) {
+      await this.pendingInviteRepository.remove(invite._id);
+    }
+    // Unblock all staged PDUs from this user in this room
+    await this.eventStagingRepository.unmarkInvitePendingForRoomAndSender(roomId, userId);
+
+    const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
+    if (!lock) {
+      this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
+      return;
+    }
+    this.stagingAreaQueue.enqueue(roomId);
+  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async removePendingInvite(eventId: EventID, roomId: string): Promise<void> {
await Promise.all([
this.pendingInviteRepository.remove(eventId),
this.eventStagingRepository.unmarkInvitePending(eventId),
]);
// acquire a lock for processing the event
const lock = await this.lockRepository.getLock(
roomId,
this.configService.instanceId,
);
if (!lock) {
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
return;
}
// TODO change this to call stagingAreaService directly
this.stagingAreaQueue.enqueue(roomId);
}
async resolvePendingInvitesForUser(roomId: string, userId: string): Promise<void> {
// Remove the invite entry (by user+room) if present
const invite = await this.pendingInviteRepository.findByUserIdAndRoomId(userId, roomId);
if (invite?._id) {
await this.pendingInviteRepository.remove(invite._id);
}
// Unblock all staged PDUs from this user in this room
await this.eventStagingRepository.unmarkInvitePendingForRoomAndSender(roomId, userId);
const lock = await this.lockRepository.getLock(roomId, this.configService.instanceId);
if (!lock) {
this.logger.debug(`Couldn't acquire a lock for room ${roomId}`);
return;
}
this.stagingAreaQueue.enqueue(roomId);
}


async isSenderInvitePending(sender: string, roomId: string) {
const invite = await this.pendingInviteRepository.findByUserIdAndRoomId(
sender,
roomId,
);

return !!invite;
}
}
17 changes: 8 additions & 9 deletions packages/federation-sdk/src/services/invite.service.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
import { EventBase, HttpException, HttpStatus } from '@hs/core';
import {
PduForType,
PersistentEventBase,
PersistentEventFactory,
RoomVersion,
} from '@hs/room';
import { PduForType, PersistentEventFactory, RoomVersion } from '@hs/room';
import { singleton } from 'tsyringe';
import { createLogger } from '../utils/logger';
import { ConfigService } from './config.service';
Expand Down Expand Up @@ -70,7 +65,7 @@ export class InviteService {
sender: sender,
},

roomInformation.room_version,
roomInformation.room_version as RoomVersion,
);

// SPEC: Invites a remote user to a room. Once the event has been signed by both the inviting homeserver and the invited homeserver, it can be sent to all of the servers in the room by the inviting homeserver.
Expand Down Expand Up @@ -113,7 +108,7 @@ export class InviteService {
await stateService.persistStateEvent(
PersistentEventFactory.createFromRawEvent(
inviteResponse.event,
roomInformation.room_version,
roomInformation.room_version as RoomVersion,
),
);

Expand Down Expand Up @@ -185,7 +180,11 @@ export class InviteService {
}

// we are not the host of the server
// so being the origin of the user, we sign the event and send it to the asking server, let them handle the transactions
// nor are we part of the room now.

await this.eventService.addPendingInvite(inviteEvent);

Comment on lines +183 to +186
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Idempotency: guard against duplicate inserts.

addPendingInvite ultimately does insertOne(_id=inviteEventId). Retries can raise E11000. Use upsert in the repo or catch/ignore duplicate key errors.

Apply in repository (see pending-invite.repository.ts comment) or wrap call:

- await this.eventService.addPendingInvite(inviteEvent);
+ try {
+   await this.eventService.addPendingInvite(inviteEvent);
+ } catch (e: any) {
+   if (!/E11000/.test(String(e?.code) + String(e?.message))) throw e;
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// nor are we part of the room now.
await this.eventService.addPendingInvite(inviteEvent);
// nor are we part of the room now.
try {
await this.eventService.addPendingInvite(inviteEvent);
} catch (e: any) {
if (!/E11000/.test(String(e?.code) + String(e?.message))) throw e;
}
🤖 Prompt for AI Agents
In packages/federation-sdk/src/services/invite.service.ts around lines 183 to
186, the call to this.eventService.addPendingInvite(inviteEvent) can cause
duplicate-key E11000 on retries because addPendingInvite does an insertOne;
modify the repository to perform an upsert (insert-or-update) for the
pending-invite record or change addPendingInvite to catch duplicate key errors
and ignore them (only rethrow non-duplicate errors). Ensure the chosen fix is
applied in pending-invite.repository.ts (prefer upsert with the invite ID as the
filter) or wrap the service call in a try/catch that suppresses E11000.

// being the origin of the user, we sign the event and send it to the asking server, let them handle the transactions
return inviteEvent;
}
}
5 changes: 5 additions & 0 deletions packages/federation-sdk/src/services/room.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,11 @@ export class RoomService {
throw new Error(joinEventFinal.rejectedReason);
}

await this.eventService.removePendingInvite(
joinEventFinal.eventId,
joinEventFinal.roomId,
);
Comment on lines +992 to +995
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Bug: wrong identifier used to clear pending invite and unblock staged events.

removePendingInvite(joinEventFinal.eventId, …) won’t remove the pending invite (stored by inviteEventId) and won’t unmark staged events (flagged by sender+room). Clear by userId+roomId and unmark all staged events for that pair.

Apply:

-    await this.eventService.removePendingInvite(
-      joinEventFinal.eventId,
-      joinEventFinal.roomId,
-    );
+    await this.eventService.resolvePendingInvitesForUser(
+      joinEventFinal.roomId,
+      userId,
+    );

Supporting changes are proposed in event.service.ts and event-staging.repository.ts comments below.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
await this.eventService.removePendingInvite(
joinEventFinal.eventId,
joinEventFinal.roomId,
);
await this.eventService.resolvePendingInvitesForUser(
joinEventFinal.roomId,
userId,
);
🤖 Prompt for AI Agents
In packages/federation-sdk/src/services/room.service.ts around lines 992 to 995,
the call to removePendingInvite uses joinEventFinal.eventId which is the wrong
identifier; change it to remove the pending invite by userId and roomId (e.g.,
joinEventFinal.userId, joinEventFinal.roomId) and ensure you also call or extend
the event staging cleanup to unmark all staged events for that sender+room pair
so staged events are cleared for that user in that room; adapt the call
signature and invocation to pass userId+roomId and trigger unmarking of all
staged events for that pair (see related suggested changes in event.service.ts
and event-staging.repository.ts).


return joinEventFinal.eventId;
}

Expand Down