diff --git a/packages/core/src/models/event.model.ts b/packages/core/src/models/event.model.ts index 34b2b6885..38fd9ada1 100644 --- a/packages/core/src/models/event.model.ts +++ b/packages/core/src/models/event.model.ts @@ -26,6 +26,8 @@ export interface EventStore extends PersistentEventBase { export interface EventStagingStore extends PersistentEventBase { roomId: string; + + pendingInvite: boolean; } export interface FetchedEvents { diff --git a/packages/federation-sdk/src/container.ts b/packages/federation-sdk/src/container.ts index dad661899..382f44e90 100644 --- a/packages/federation-sdk/src/container.ts +++ b/packages/federation-sdk/src/container.ts @@ -16,6 +16,10 @@ import { MatrixBridgedRoom, MatrixBridgedRoomRepository, } from './repositories/matrix-bridged-room.repository'; +import { + PendingInvite, + PendingInviteRepository, +} from './repositories/pending-invite.repository'; import { Room, RoomRepository } from './repositories/room.repository'; import { Server, ServerRepository } from './repositories/server.repository'; import { StateRepository, StateStore } from './repositories/state.repository'; @@ -104,6 +108,12 @@ export async function createFederationContainer( }, ); + container.register>('PendingInviteCollection', { + useValue: db.collection( + 'rocketchat_federation_pending_invites', + ), + }); + container.registerSingleton(EventRepository); container.registerSingleton(EventStagingRepository); container.registerSingleton(KeyRepository); @@ -117,6 +127,7 @@ export async function createFederationContainer( container.registerSingleton(FederationRequestService); container.registerSingleton(FederationService); container.registerSingleton(StateService); + container.registerSingleton(PendingInviteRepository); container.registerSingleton(EventService); container.registerSingleton(EventFetcherService); container.registerSingleton(EventAuthorizationService); diff --git a/packages/federation-sdk/src/repositories/event-staging.repository.ts b/packages/federation-sdk/src/repositories/event-staging.repository.ts index 8662c6d1d..c642dae33 100644 --- a/packages/federation-sdk/src/repositories/event-staging.repository.ts +++ b/packages/federation-sdk/src/repositories/event-staging.repository.ts @@ -17,6 +17,7 @@ export class EventStagingRepository { eventId: EventID, origin: string, event: Pdu, + pendingInvite = false, ): Promise { // We use an upsert here to handle the case where we see the same event // from the same server multiple times. @@ -32,6 +33,7 @@ export class EventStagingRepository { }, $set: { event, + pendingInvite, }, }, { @@ -48,6 +50,7 @@ export class EventStagingRepository { return this.collection.findOne( { roomId, + pendingInvite: false, }, { sort: { createdAt: 1 }, @@ -58,4 +61,11 @@ export class EventStagingRepository { async getDistinctStagedRooms(): Promise { return this.collection.distinct('roomId'); } + + async unmarkInvitePending(eventId: EventID): Promise { + return this.collection.updateOne( + { _id: eventId }, + { $set: { pendingInvite: false } }, + ); + } } diff --git a/packages/federation-sdk/src/repositories/pending-invite.repository.ts b/packages/federation-sdk/src/repositories/pending-invite.repository.ts new file mode 100644 index 000000000..25d0f36fe --- /dev/null +++ b/packages/federation-sdk/src/repositories/pending-invite.repository.ts @@ -0,0 +1,40 @@ +import { EventID, Pdu } from '@hs/room'; +import { Collection } from 'mongodb'; +import { inject, singleton } from 'tsyringe'; + +export type PendingInvite = { + event: Pdu; + _id: EventID; + createdAt: Date; +}; + +@singleton() +export class PendingInviteRepository { + constructor( + @inject('PendingInviteCollection') + private readonly collection: Collection, + ) {} + + async add(eventId: EventID, event: Pdu): Promise { + await this.collection.insertOne({ + _id: eventId, + event, + createdAt: new Date(), + }); + } + + async findByUserIdAndRoomId( + userId: string, + roomId: string, + ): Promise { + return this.collection.findOne({ + 'event.type': 'm.room.member', + 'event.state_key': userId, + 'event.room_id': roomId, + }); + } + + async remove(eventId: EventID): Promise { + await this.collection.deleteOne({ _id: eventId }); + } +} diff --git a/packages/federation-sdk/src/services/event.service.ts b/packages/federation-sdk/src/services/event.service.ts index 170a8f2c1..f3edb6c4d 100644 --- a/packages/federation-sdk/src/services/event.service.ts +++ b/packages/federation-sdk/src/services/event.service.ts @@ -22,6 +22,7 @@ import { type Pdu, type PduForType, type PduType, + PersistentEventBase, PersistentEventFactory, getAuthChain, } from '@hs/room'; @@ -32,6 +33,7 @@ import { EventStagingRepository } from '../repositories/event-staging.repository import { EventRepository } from '../repositories/event.repository'; import { KeyRepository } from '../repositories/key.repository'; import { LockRepository } from '../repositories/lock.repository'; +import { PendingInviteRepository } from '../repositories/pending-invite.repository'; import { eventSchemas } from '../utils/event-schemas'; import { ConfigService } from './config.service'; import { EventEmitterService } from './event-emitter.service'; @@ -59,6 +61,8 @@ export class EventService { private readonly stateService: StateService, private readonly eventEmitterService: EventEmitterService, + + private readonly pendingInviteRepository: PendingInviteRepository, ) { // on startup we look for old staged events and try to process them setTimeout(() => { @@ -196,8 +200,22 @@ export class EventService { continue; } + const pendingInvite = await this.isSenderInvitePending( + event.sender, + event.room_id, + ); + // save the event as staged to be processed - await this.eventStagingRepository.create(eventId, origin, event); + await this.eventStagingRepository.create( + eventId, + origin, + event, + pendingInvite, + ); + + if (pendingInvite) { + continue; + } // acquire a lock for processing the event const lock = await this.lockRepository.getLock( @@ -806,4 +824,37 @@ export class EventService { throw error; } } + + async addPendingInvite(event: PersistentEventBase): Promise { + await this.pendingInviteRepository.add(event.eventId, event.event as Pdu); + } + + async removePendingInvite(eventId: EventID, roomId: string): Promise { + await Promise.all([ + this.pendingInviteRepository.remove(eventId), + this.eventStagingRepository.unmarkInvitePending(eventId), + ]); + + // acquire a lock for processing the event + const lock = await this.lockRepository.getLock( + roomId, + this.configService.instanceId, + ); + if (!lock) { + this.logger.debug(`Couldn't acquire a lock for room ${roomId}`); + return; + } + + // TODO change this to call stagingAreaService directly + this.stagingAreaQueue.enqueue(roomId); + } + + async isSenderInvitePending(sender: string, roomId: string) { + const invite = await this.pendingInviteRepository.findByUserIdAndRoomId( + sender, + roomId, + ); + + return !!invite; + } } diff --git a/packages/federation-sdk/src/services/invite.service.ts b/packages/federation-sdk/src/services/invite.service.ts index a6efa3279..07499e059 100644 --- a/packages/federation-sdk/src/services/invite.service.ts +++ b/packages/federation-sdk/src/services/invite.service.ts @@ -1,10 +1,5 @@ import { EventBase, HttpException, HttpStatus } from '@hs/core'; -import { - PduForType, - PersistentEventBase, - PersistentEventFactory, - RoomVersion, -} from '@hs/room'; +import { PduForType, PersistentEventFactory, RoomVersion } from '@hs/room'; import { singleton } from 'tsyringe'; import { createLogger } from '../utils/logger'; import { ConfigService } from './config.service'; @@ -70,7 +65,7 @@ export class InviteService { sender: sender, }, - roomInformation.room_version, + roomInformation.room_version as RoomVersion, ); // SPEC: Invites a remote user to a room. Once the event has been signed by both the inviting homeserver and the invited homeserver, it can be sent to all of the servers in the room by the inviting homeserver. @@ -113,7 +108,7 @@ export class InviteService { await stateService.persistStateEvent( PersistentEventFactory.createFromRawEvent( inviteResponse.event, - roomInformation.room_version, + roomInformation.room_version as RoomVersion, ), ); @@ -185,7 +180,11 @@ export class InviteService { } // we are not the host of the server - // so being the origin of the user, we sign the event and send it to the asking server, let them handle the transactions + // nor are we part of the room now. + + await this.eventService.addPendingInvite(inviteEvent); + + // being the origin of the user, we sign the event and send it to the asking server, let them handle the transactions return inviteEvent; } } diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index feea2baaf..2b35fc113 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -989,6 +989,11 @@ export class RoomService { throw new Error(joinEventFinal.rejectedReason); } + await this.eventService.removePendingInvite( + joinEventFinal.eventId, + joinEventFinal.roomId, + ); + return joinEventFinal.eventId; }