diff --git a/packages/core/src/models/event.model.ts b/packages/core/src/models/event.model.ts index 562cd4df5..f253c355e 100644 --- a/packages/core/src/models/event.model.ts +++ b/packages/core/src/models/event.model.ts @@ -33,6 +33,8 @@ export interface EventStore extends PersistentEventBase { reason: string; rejectedBy?: EventID; }; + + partial: boolean; } export interface EventStagingStore extends PersistentEventBase { diff --git a/packages/federation-sdk/src/repositories/event.repository.ts b/packages/federation-sdk/src/repositories/event.repository.ts index 4010332e5..1c85f8be1 100644 --- a/packages/federation-sdk/src/repositories/event.repository.ts +++ b/packages/federation-sdk/src/repositories/event.repository.ts @@ -6,6 +6,7 @@ import { PduForType, PduType, RejectCode, + RoomID, StateID, } from '@rocket.chat/federation-room'; import type { @@ -391,6 +392,7 @@ export class EventRepository { eventId: EventID, event: Pdu, stateId: StateID, + partial = false, ): Promise { return this.collection.updateOne( { _id: eventId }, @@ -402,6 +404,7 @@ export class EventRepository { }, $set: { stateId, + partial, }, }, { upsert: true }, @@ -444,6 +447,7 @@ export class EventRepository { event, stateId, nextEventId: '', + partial: false, }, $set: { rejectCode: code, @@ -467,4 +471,11 @@ export class EventRepository { findByType(type: PduType) { return this.collection.find({ 'event.type': type }); } + + findPartialsByRoomId(roomId: RoomID) { + return this.collection.find( + { 'event.room_id': roomId, partial: true }, + { sort: { 'event.depth': 1, createdAt: 1 } }, + ); + } } diff --git a/packages/federation-sdk/src/repositories/state-graph.repository.ts b/packages/federation-sdk/src/repositories/state-graph.repository.ts index 351032dc2..3e0bf36fc 100644 --- a/packages/federation-sdk/src/repositories/state-graph.repository.ts +++ b/packages/federation-sdk/src/repositories/state-graph.repository.ts @@ -21,6 +21,8 @@ export type StateGraphStore = { depth: number; createdAt: Date; + + partial: boolean; }; @singleton() @@ -180,6 +182,8 @@ export class StateGraphRepository { chainId = new ObjectId().toString(); } + const partial = event.isPartial() || (previousDelta?.partial ?? false); + await this.collection.insertOne({ _id: stateId, createdAt: new Date(), @@ -190,6 +194,7 @@ export class StateGraphRepository { previousNode: previousStateId, chainId, depth, + partial, }); return stateId; diff --git a/packages/federation-sdk/src/services/event-fetcher.service.ts b/packages/federation-sdk/src/services/event-fetcher.service.ts index b887e6adc..8d09daba9 100644 --- a/packages/federation-sdk/src/services/event-fetcher.service.ts +++ b/packages/federation-sdk/src/services/event-fetcher.service.ts @@ -87,7 +87,7 @@ export class EventFetcherService { }; } - private async fetchEventsFromFederation( + async fetchEventsFromFederation( eventIds: string[], targetServerName: string, ): Promise { diff --git a/packages/federation-sdk/src/services/federation.service.ts b/packages/federation-sdk/src/services/federation.service.ts index 068f55443..7fed0c33f 100644 --- a/packages/federation-sdk/src/services/federation.service.ts +++ b/packages/federation-sdk/src/services/federation.service.ts @@ -1,12 +1,11 @@ import type { EventBase } from '@rocket.chat/federation-core'; import type { BaseEDU } from '@rocket.chat/federation-core'; -import type { ProtocolVersionKey } from '@rocket.chat/federation-core'; import { createLogger } from '@rocket.chat/federation-core'; import { Pdu, - PduForType, PersistentEventBase, PersistentEventFactory, + extractDomainFromId, } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; import { @@ -235,7 +234,15 @@ export class FederationService { } async sendEventToAllServersInRoom(event: PersistentEventBase) { - const servers = await this.stateService.getServersInRoom(event.roomId); + const servers = await this.stateService.getServerSetInRoom(event.roomId); + + if (event.stateKey) { + const server = extractDomainFromId(event.stateKey); + // TODO: fgetser + if (!servers.has(server)) { + servers.add(server); + } + } for (const server of servers) { if (server === event.origin) { diff --git a/packages/federation-sdk/src/services/invite.service.ts b/packages/federation-sdk/src/services/invite.service.ts index fa72edfd8..56a98f18f 100644 --- a/packages/federation-sdk/src/services/invite.service.ts +++ b/packages/federation-sdk/src/services/invite.service.ts @@ -7,12 +7,13 @@ import { RoomID, RoomVersion, UserID, + extractDomainFromId, } from '@rocket.chat/federation-room'; import { singleton } from 'tsyringe'; import { ConfigService } from './config.service'; import { EventService } from './event.service'; import { FederationService } from './federation.service'; -import { StateService } from './state.service'; +import { StateService, UnknownRoomError } from './state.service'; // TODO: Have better (detailed/specific) event input type export type ProcessInviteEvent = { event: EventBase; @@ -50,7 +51,7 @@ export class InviteService { const stateService = this.stateService; const federationService = this.federationService; - const roomInformation = await stateService.getRoomInformation(roomId); + const roomVersion = await this.stateService.getRoomVersion(roomId); // Extract displayname from userId for direct messages const displayname = isDirectMessage @@ -76,12 +77,12 @@ export class InviteService { sender: sender, }, - roomInformation.room_version, + roomVersion, ); // SPEC: Invites a remote user to a room. Once the event has been signed by both the inviting homeserver and the invited homeserver, it can be sent to all of the servers in the room by the inviting homeserver. - const invitedServer = inviteEvent.stateKey?.split(':').pop(); + const invitedServer = extractDomainFromId(inviteEvent.stateKey ?? ''); if (!invitedServer) { throw new Error( `invalid state_key ${inviteEvent.stateKey}, no server_name part`, @@ -92,10 +93,6 @@ export class InviteService { if (invitedServer === this.configService.serverName) { await stateService.handlePdu(inviteEvent); - if (inviteEvent.rejected) { - throw new Error(inviteEvent.rejectReason); - } - // let all servers know of this state change // without it join events will not be processed if /event/{eventId} causes problems void federationService.sendEventToAllServersInRoom(inviteEvent); @@ -104,7 +101,7 @@ export class InviteService { event_id: inviteEvent.eventId, event: PersistentEventFactory.createFromRawEvent( inviteEvent.event, - roomInformation.room_version, + roomVersion, ), room_id: roomId, }; @@ -115,7 +112,7 @@ export class InviteService { const inviteResponse = await federationService.inviteUser( inviteEvent, - roomInformation.room_version, + roomVersion, ); // try to save @@ -123,7 +120,7 @@ export class InviteService { await stateService.handlePdu( PersistentEventFactory.createFromRawEvent( inviteResponse.event, - roomInformation.room_version, + roomVersion, ), ); @@ -134,7 +131,7 @@ export class InviteService { event_id: inviteEvent.eventId, event: PersistentEventFactory.createFromRawEvent( inviteEvent.event, - roomInformation.room_version, + roomVersion, ), room_id: roomId, }; @@ -171,9 +168,6 @@ export class InviteService { // attempt to persist the invite event as we already have the state await this.stateService.handlePdu(inviteEvent); - if (inviteEvent.rejected) { - throw new Error(inviteEvent.rejectReason); - } // we do not send transaction here // the asking server will handle the transactions @@ -182,21 +176,6 @@ export class InviteService { return inviteEvent; } - // are we already in the room? - try { - await this.stateService.getRoomInformation(roomId); - - // if we have the state we try to persist the invite event - await this.stateService.handlePdu(inviteEvent); - if (inviteEvent.rejected) { - throw new Error(inviteEvent.rejectReason); - } - } catch { - // don't have state copy yet - // console.error(e); - // typical noop, we sign and return the event, nothing to do - } - // we are not the host of the server // so being the origin of the user, we sign the event and send it to the asking server, let them handle the transactions return inviteEvent; diff --git a/packages/federation-sdk/src/services/profiles.service.ts b/packages/federation-sdk/src/services/profiles.service.ts index a07862af8..e6a872324 100644 --- a/packages/federation-sdk/src/services/profiles.service.ts +++ b/packages/federation-sdk/src/services/profiles.service.ts @@ -87,6 +87,14 @@ export class ProfilesService { throw new Error(`Unsupported room version: ${roomVersion}`); } + if ( + !(await this.stateService.getLatestRoomState2(roomId)).isUserInvited( + userId, + ) + ) { + throw new Error(`User ${userId} is not invited`); + } + const membershipEvent = await stateService.buildEvent<'m.room.member'>( { type: 'm.room.member', diff --git a/packages/federation-sdk/src/services/room.service.ts b/packages/federation-sdk/src/services/room.service.ts index 46e1e39e1..6cb524943 100644 --- a/packages/federation-sdk/src/services/room.service.ts +++ b/packages/federation-sdk/src/services/room.service.ts @@ -4,6 +4,7 @@ import { RoomPowerLevelsEvent, SignedEvent, TombstoneAuthEvents, + createLogger, roomPowerLevelsEvent, } from '@rocket.chat/federation-core'; import { singleton } from 'tsyringe'; @@ -23,9 +24,12 @@ import { PduType, PersistentEventBase, PersistentEventFactory, + EventStore as RoomEventStore, RoomID, RoomVersion, UserID, + checkEventAuthWithState, + extractDomainFromId, } from '@rocket.chat/federation-room'; import { EventRepository } from '../repositories/event.repository'; import { RoomRepository } from '../repositories/room.repository'; @@ -33,11 +37,13 @@ import { ConfigService } from './config.service'; import { EventService } from './event.service'; import { EventEmitterService } from './event-emitter.service'; +import { EventFetcherService } from './event-fetcher.service'; import { InviteService } from './invite.service'; -import { StateService } from './state.service'; +import { StateService, UnknownRoomError } from './state.service'; @singleton() export class RoomService { + private readonly logger = createLogger('RoomService'); constructor( private readonly roomRepository: RoomRepository, private readonly eventRepository: EventRepository, @@ -47,6 +53,7 @@ export class RoomService { private readonly stateService: StateService, private readonly inviteService: InviteService, private readonly eventEmitterService: EventEmitterService, + private readonly eventFetcherService: EventFetcherService, ) {} private validatePowerLevelChange( @@ -782,7 +789,7 @@ export class RoomService { const federationService = this.federationService; // where the room is hosted at - const residentServer = roomId.split(':').pop(); + const residentServer = extractDomainFromId(roomId); // our own room, we can validate the join event by ourselves // once done, emit the event to all participating servers @@ -846,7 +853,7 @@ export class RoomService { // ^ have the template for the join event now const joinEvent = PersistentEventFactory.createFromRawEvent( - makeJoinResponse.event, // TODO: using room package types will take care of this + makeJoinResponse.event, makeJoinResponse.room_version, ); @@ -854,81 +861,261 @@ export class RoomService { // TODO: sign the event here vvv // currently makeSignedRequest does the signing - const sendJoinResponse = await federationService.sendJoin(joinEvent); + const { + state, + auth_chain: authChain, + event, + servers_in_room: serversInRoom = [], + } = await federationService.sendJoin(joinEvent); // TODO: validate hash and sig (item 2) // run through state res // validate all auth chain events - const eventMap = new Map(); + try { + await stateService.getRoomVersion(roomId); + + this.logger.info({ roomId }, 'state already exists'); + } catch (error) { + if (error instanceof UnknownRoomError) { + // if already in room, skip this, walk join event to fill the state + await stateService.processInitialState(state, authChain); + } + } - for (const stateEvent_ of sendJoinResponse.state) { - const stateEvent = PersistentEventFactory.createFromRawEvent( - stateEvent_, - makeJoinResponse.room_version, + if (await stateService.isRoomStatePartial(roomId)) { + this.logger.info( + { roomId }, + 'received incomplete graph of state from send_join, completing state before processing join', ); - eventMap.set(stateEvent.eventId, stateEvent); - } + const partialEvents = await stateService.getPartialEvents(roomId); - for (const authEvent_ of sendJoinResponse.auth_chain) { - const authEvent = PersistentEventFactory.createFromRawEvent( - authEvent_, - makeJoinResponse.room_version, + this.logger.info( + { roomId, partialEventIds: partialEvents.map((e) => e.eventId) }, + 'events with incomplete states', ); - eventMap.set(authEvent.eventId, authEvent); - } - const sorted = Array.from(eventMap.values()).sort((a, b) => { - if (a.depth !== b.depth) { - return a.depth - b.depth; - } + for (const event of partialEvents) { + this.logger.info({ roomId, eventId: event.eventId }, 'walking branch'); + + const missingBranchEvents = ( + await this._fetchFullBranch( + event.getPreviousEventIds(), + residentServer, + serversInRoom, + { event }, + ) + ).sort((e1, e2) => { + if (e1.depth !== e2.depth) { + return e1.depth - e2.depth; + } - if (a.originServerTs !== b.originServerTs) { - return a.originServerTs - b.originServerTs; - } + if (e1.originServerTs !== e2.originServerTs) { + return e1.originServerTs - e2.originServerTs; + } - return a.eventId.localeCompare(b.eventId); - }); + return e1.eventId.localeCompare(e2.eventId); + }); - for (const event of sorted) { - logger.debug({ - msg: 'Persisting event', - eventId: event.eventId, - event: event.event, - }); - await stateService.handlePdu(event); + for (const missingEvent of missingBranchEvents) { + this.logger.info( + { + eventId: event.eventId, + depth: event.depth, + previous: event.getPreviousEventIds(), + roomId, + }, + 'processing state at event', + ); + await stateService._resolveStateAtEvent(missingEvent); + } + + // if partial, join event will also be partial + await stateService._resolveStateAtEvent(event); + } + + if (await stateService.isRoomStatePartial(roomId)) { + throw new Error( + `${roomId} still in partial state after processing all branches`, + ); + } + } else { + this.logger.info( + { roomId }, + 'received complete graph of state from send_join, nothing to do', + ); } const joinEventFinal = PersistentEventFactory.createFromRawEvent( - sendJoinResponse.event, + event, makeJoinResponse.room_version, ); + // FIXME: this should be here, but since using join event to walk and repopulate missing message, there is no gurantee the check will pass + + // with the state we have, run auth check + // const room = await stateService.getLatestRoomState(roomId); + + // try { + // await checkEventAuthWithState( + // joinEventFinal, + // room, + // stateService._getStore(roomVersion), + // ); + // } catch (error) { + // this.logger.error( + // { error, roomId, eventId: joinEventFinal.eventId }, + // 'failed to join room, join event did not pass auth check', + // ); + + // throw error; + // } + logger.info({ msg: 'Persisting join event', eventId: joinEventFinal.eventId, event: joinEventFinal.event, }); - const state = await stateService.getLatestRoomState(roomId); - - logger.info({ - msg: 'State before join event has been persisted', - state: state.keys().toArray().join(', '), - }); - // try to persist the join event now, should succeed with state in place await this.eventService.processIncomingPDUs( residentServer || joinEventFinal.origin, [joinEventFinal.event], ); - if (joinEventFinal.rejected) { - throw new Error(joinEventFinal.rejectReason); + return joinEventFinal.eventId; + } + + private async _fetchFullBranch( + eventIds: EventID[], + residentServer: string, + serverList: string[], + context: { event: PersistentEventBase }, + ) { + if (eventIds.length === 0) { + return []; } - return joinEventFinal.eventId; + const previousEvents = [] as PersistentEventBase[]; + const roomId = context.event.roomId; + const roomVersion = context.event.version; + const store = this.stateService._getStore(roomVersion); + + let missing = [] as EventID[]; + + const { events, missing: stillMissing } = await this._fetchMissingEvents( + eventIds, + roomVersion, + store, + residentServer, + ); + + missing = stillMissing; + previousEvents.push(...events); + + if (missing.length === 0) { + previousEvents.push( + ...(await this._fetchFullBranch( + previousEvents.flatMap((e) => e.getPreviousEventIds()), + residentServer, + serverList, + context, + )), + ); + + return previousEvents; + } + + const logContext = () => ({ + roomId, + branchEventId: context.event.eventId, + missing, + }); + + this.logger.info(logContext(), 'failed to fetch and process some events'); + + if (serverList.length === 0) { + this.logger.warn( + logContext(), + 'not enough servers participating in the room to retry missing events', + ); + + throw new Error(); + } + + for (let i = 0; i < serverList.length && missing.length > 0; i++) { + const askingServer = serverList[i]; + + this.logger.warn( + logContext(), + `attempting to fetch events from participating server ${askingServer}`, + ); + + const { events, missing: stillMissing } = await this._fetchMissingEvents( + missing, + roomVersion, + store, + askingServer, + ); + + missing = stillMissing; + previousEvents.push(...events); + } + + if (missing.length > 0) { + this.logger.error( + logContext(), + 'server list exhausted, we still have missing events', + ); + + throw new Error(); + } + + // all found + previousEvents.push( + ...(await this._fetchFullBranch( + previousEvents.flatMap((e) => e.getPreviousEventIds()), + residentServer, + serverList, + context, + )), + ); + + return previousEvents; + } + + private async _fetchMissingEvents( + eventIds: EventID[], + roomVersion: RoomVersion, + store: RoomEventStore, + askedServerName: string, + ) { + const seenEvents = await store.getEvents(eventIds); + + if (seenEvents.length === eventIds.length) { + return { missing: [], events: [] }; + } + + const needsFetching = new Set(eventIds) + .difference(new Set(seenEvents.map((e) => e.eventId))) + .values() + .toArray(); + + const remotePdus = await this.eventFetcherService.fetchEventsFromFederation( + needsFetching, + askedServerName, + ); + + const cache = new Map(); + for (const pdu of remotePdus) { + const base = PersistentEventFactory.createFromRawEvent(pdu, roomVersion); + cache.set(base.eventId, base); + } + + const stillMissing = needsFetching.filter((id) => !cache.has(id)); + + return { missing: stillMissing, events: cache.values() }; } async markRoomAsTombstone( diff --git a/packages/federation-sdk/src/services/send-join.service.ts b/packages/federation-sdk/src/services/send-join.service.ts index f04e02196..9b693845f 100644 --- a/packages/federation-sdk/src/services/send-join.service.ts +++ b/packages/federation-sdk/src/services/send-join.service.ts @@ -42,7 +42,7 @@ export class SendJoinService { } const joinEvent = await this.stateService.buildEvent<'m.room.member'>( - { ...event, auth_events: [] }, + event, roomVersion, ); diff --git a/packages/federation-sdk/src/services/staging-area.service.ts b/packages/federation-sdk/src/services/staging-area.service.ts index 0854f5f01..4cc8d6de9 100644 --- a/packages/federation-sdk/src/services/staging-area.service.ts +++ b/packages/federation-sdk/src/services/staging-area.service.ts @@ -23,7 +23,7 @@ import { EventService } from './event.service'; import { LockRepository } from '../repositories/lock.repository'; import { ConfigService } from './config.service'; import { MissingEventService } from './missing-event.service'; -import { StateService } from './state.service'; +import { PartialStateResolutionError, StateService } from './state.service'; class MissingAuthorizationEventsError extends Error { constructor(message: string) { @@ -98,6 +98,12 @@ export class StagingAreaService { eventId: event._id, err, }); + } else if (err instanceof PartialStateResolutionError) { + this.logger.info({ + msg: 'Still joining room, postponing event processing', + eventId: event._id, + err, + }); } else { this.logger.error({ msg: 'Error processing event', diff --git a/packages/federation-sdk/src/services/state.service.spec.ts b/packages/federation-sdk/src/services/state.service.spec.ts index 3451174ac..ec2242aab 100644 --- a/packages/federation-sdk/src/services/state.service.spec.ts +++ b/packages/federation-sdk/src/services/state.service.spec.ts @@ -1,4 +1,4 @@ -import { describe, expect, it, spyOn, test } from 'bun:test'; +import { beforeEach, describe, expect, it, spyOn, test } from 'bun:test'; import { type EventStore } from '@rocket.chat/federation-core'; import * as room from '@rocket.chat/federation-room'; import { @@ -139,6 +139,13 @@ describe('StateService', async () => { await database.getDb() ).collection('state_graph_test'); + beforeEach(async () => { + await Promise.all([ + eventCollection.deleteMany(), + stateGraphCollection.deleteMany(), + ]); + }); + const eventRepository = new EventRepository(eventCollection); const stateGraphRepository = new StateGraphRepository(stateGraphCollection); @@ -245,6 +252,574 @@ describe('StateService', async () => { }; }; + const getStore = ( + cache: Map, + ): room.EventStore => ({ + getEvents: (eventIds: EventID[]) => { + return Promise.resolve(eventIds.map((eid) => cache.get(eid)!)); + }, + }); + + const partialStateEvents = await Promise.all([ + async () => { + const username = '@alice:anotherserver.com' as room.UserID; + const name = 'Test Partial State Room'; + + const roomCreateEvent = PersistentEventFactory.newCreateEvent( + username as room.UserID, + PersistentEventFactory.defaultRoomVersion, + ); + + const roomVersion = roomCreateEvent.version; + + const creatorMembershipEvent = + await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + state_key: username as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [roomCreateEvent.eventId], + auth_events: [roomCreateEvent.eventId], + depth: 1, + }, + roomVersion, + ); + + // insert a random message event to make the tree incomplete + const messageEvent = await stateService.buildEvent<'m.room.message'>( + { + room_id: roomCreateEvent.roomId, + sender: username, + content: { body: 'hello world', msgtype: 'm.text' }, + type: 'm.room.message', + ...getDefaultFields(), + prev_events: [creatorMembershipEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 2, + }, + roomVersion, + ); + + const roomNameEvent = await stateService.buildEvent<'m.room.name'>( + { + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + content: { name }, + state_key: '', + type: 'm.room.name', + ...getDefaultFields(), + prev_events: [messageEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 3, + }, + roomVersion, + ); + + const joinRuleEvent = await stateService.buildEvent<'m.room.join_rules'>( + { + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + content: { join_rule: 'public' }, + type: 'm.room.join_rules', + state_key: '', + ...getDefaultFields(), + prev_events: [roomNameEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + ], + depth: 4, + }, + roomVersion, + ); + + const powerLevelEvent = + await stateService.buildEvent<'m.room.power_levels'>( + { + type: 'm.room.power_levels', + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + state_key: '', + content: { + users: { + [username]: 100, + }, + users_default: 0, + events: {}, + events_default: 0, + state_default: 50, + ban: 50, + kick: 50, + redact: 50, + invite: 50, + }, + ...getDefaultFields(), + prev_events: [joinRuleEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + joinRuleEvent.eventId, + ], + depth: 5, + }, + roomVersion, + ); + + const ourUserJoinEvent = await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: '@us:example.com' as room.UserID, + state_key: '@us:example.com' as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [powerLevelEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + powerLevelEvent.eventId, + joinRuleEvent.eventId, + ], + depth: 6, + }, + roomVersion, + ); + const state = { + roomCreateEvent, + powerLevelEvent, + creatorMembershipEvent, + roomNameEvent, + ourUserJoinEvent, + joinRuleEvent, + }; + + const map = new Map(); + const authChainSet = new Set(); + for (const event of Object.values(state)) { + map.set(event.eventId, event); + } + + const store = getStore(map); + + for (const event of Object.values(state)) { + for (const eventId of await room.getAuthChain(event, store)) { + authChainSet.add(eventId); + } + } + + const authChain = Array.from(authChainSet.values()).map( + (eid) => map.get(eid)!, + ); + + return { + state, + authChain, + missingEvents: [messageEvent] as PersistentEventBase[], + }; + }, + // multiple missing in the middle + async () => { + const username = '@alice:anotherserver.com' as room.UserID; + const name = 'Test Partial State Room'; + + const roomCreateEvent = PersistentEventFactory.newCreateEvent( + username as room.UserID, + PersistentEventFactory.defaultRoomVersion, + ); + + const roomVersion = roomCreateEvent.version; + + const creatorMembershipEvent = + await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + state_key: username as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [roomCreateEvent.eventId], + auth_events: [roomCreateEvent.eventId], + depth: 1, + }, + roomVersion, + ); + + // insert a random message event to make the tree incomplete + const messageEvent = await stateService.buildEvent<'m.room.message'>( + { + room_id: roomCreateEvent.roomId, + sender: username, + content: { body: 'hello world', msgtype: 'm.text' }, + type: 'm.room.message', + ...getDefaultFields(), + prev_events: [creatorMembershipEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 2, + }, + roomVersion, + ); + + // this should become an extremity now since no known event will point to this + const roomNameEvent = await stateService.buildEvent<'m.room.name'>( + { + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + content: { name }, + state_key: '', + type: 'm.room.name', + ...getDefaultFields(), + prev_events: [messageEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 3, + }, + roomVersion, + ); + + // insert another random message event to make the tree incomplete + const messageEvent2 = await stateService.buildEvent<'m.room.message'>( + { + room_id: roomCreateEvent.roomId, + sender: username, + content: { body: 'hello world', msgtype: 'm.text' }, + type: 'm.room.message', + ...getDefaultFields(), + prev_events: [roomNameEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 4, + }, + roomVersion, + ); + + const joinRuleEvent = await stateService.buildEvent<'m.room.join_rules'>( + { + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + content: { join_rule: 'public' }, + type: 'm.room.join_rules', + state_key: '', + ...getDefaultFields(), + prev_events: [messageEvent2.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + ], + depth: 5, + }, + roomVersion, + ); + + const powerLevelEvent = + await stateService.buildEvent<'m.room.power_levels'>( + { + type: 'm.room.power_levels', + room_id: roomCreateEvent.roomId, + sender: username as room.UserID, + state_key: '', + content: { + users: { + [username]: 100, + }, + users_default: 0, + events: {}, + events_default: 0, + state_default: 50, + ban: 50, + kick: 50, + redact: 50, + invite: 50, + }, + ...getDefaultFields(), + prev_events: [joinRuleEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + joinRuleEvent.eventId, + ], + depth: 6, + }, + roomVersion, + ); + + const ourUserJoinEvent = await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: '@us:example.com' as room.UserID, + state_key: '@us:example.com' as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [powerLevelEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + powerLevelEvent.eventId, + joinRuleEvent.eventId, + ], + depth: 7, + }, + roomVersion, + ); + + const state = { + roomCreateEvent, + powerLevelEvent, + creatorMembershipEvent, + roomNameEvent, + ourUserJoinEvent, + joinRuleEvent, + }; + + const map = new Map(); + const authChainSet = new Set(); + for (const event of Object.values(state)) { + map.set(event.eventId, event); + } + + const store = getStore(map); + + for (const event of Object.values(state)) { + for (const eventId of await room.getAuthChain(event, store)) { + authChainSet.add(eventId); + } + } + + const authChain = Array.from(authChainSet.values()).map( + (eid) => map.get(eid)!, + ); + + return { + state, + authChain, + + missingEvents: [messageEvent, messageEvent2] as PersistentEventBase[], + }; + }, + async () => { + const creator = '@alice:anotherserver.com' as room.UserID; + const name = 'Test Partial State Room'; + + const roomCreateEvent = PersistentEventFactory.newCreateEvent( + creator as room.UserID, + PersistentEventFactory.defaultRoomVersion, + ); + + const roomVersion = roomCreateEvent.version; + + const creatorMembershipEvent = + await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: creator as room.UserID, + state_key: creator as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [roomCreateEvent.eventId], + auth_events: [roomCreateEvent.eventId], + depth: 1, + }, + roomVersion, + ); + + // insert a random message event to make the tree incomplete + const messageEvent = await stateService.buildEvent<'m.room.message'>( + { + room_id: roomCreateEvent.roomId, + sender: creator, + content: { body: 'hello world', msgtype: 'm.text' }, + type: 'm.room.message', + ...getDefaultFields(), + prev_events: [creatorMembershipEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 2, + }, + roomVersion, + ); + + // this should become an extremity now since no known event will point to this + const roomNameEvent = await stateService.buildEvent<'m.room.name'>( + { + room_id: roomCreateEvent.roomId, + sender: creator as room.UserID, + content: { name }, + state_key: '', + type: 'm.room.name', + ...getDefaultFields(), + prev_events: [messageEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 3, + }, + roomVersion, + ); + + // insert another random message event to make the tree incomplete + const messageEvent2 = await stateService.buildEvent<'m.room.message'>( + { + room_id: roomCreateEvent.roomId, + sender: creator, + content: { body: 'hello world', msgtype: 'm.text' }, + type: 'm.room.message', + ...getDefaultFields(), + prev_events: [roomNameEvent.eventId], + auth_events: [ + creatorMembershipEvent.eventId, + roomCreateEvent.eventId, + ], + depth: 4, + }, + roomVersion, + ); + + const joinRuleEvent = await stateService.buildEvent<'m.room.join_rules'>( + { + room_id: roomCreateEvent.roomId, + sender: creator as room.UserID, + content: { join_rule: 'invite' }, + type: 'm.room.join_rules', + state_key: '', + ...getDefaultFields(), + prev_events: [messageEvent2.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + ], + depth: 5, + }, + roomVersion, + ); + + const powerLevelEvent = + await stateService.buildEvent<'m.room.power_levels'>( + { + type: 'm.room.power_levels', + room_id: roomCreateEvent.roomId, + sender: creator as room.UserID, + state_key: '', + content: { + users: { + [creator]: 100, + }, + users_default: 0, + events: {}, + events_default: 0, + state_default: 50, + ban: 50, + kick: 50, + redact: 50, + invite: 50, + }, + ...getDefaultFields(), + prev_events: [joinRuleEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + creatorMembershipEvent.eventId, + joinRuleEvent.eventId, + ], + depth: 6, + }, + roomVersion, + ); + + const ourUserInviteEvent = await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: creator, + state_key: '@us:example.com' as room.UserID, + content: { membership: 'invite' }, + ...getDefaultFields(), + prev_events: [powerLevelEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + powerLevelEvent.eventId, + joinRuleEvent.eventId, + creatorMembershipEvent.eventId, + ], + depth: 7, + }, + roomVersion, + ); + + const ourUserJoinEvent = await stateService.buildEvent<'m.room.member'>( + { + type: 'm.room.member', + room_id: roomCreateEvent.roomId, + sender: '@us:example.com' as room.UserID, + state_key: '@us:example.com' as room.UserID, + content: { membership: 'join' }, + ...getDefaultFields(), + prev_events: [ourUserInviteEvent.eventId], + auth_events: [ + roomCreateEvent.eventId, + powerLevelEvent.eventId, + joinRuleEvent.eventId, + ourUserInviteEvent.eventId, + ], + depth: 8, + }, + roomVersion, + ); + + const state = { + roomCreateEvent, + powerLevelEvent, + creatorMembershipEvent, + roomNameEvent, + ourUserJoinEvent, + ourUserInviteEvent, + joinRuleEvent, + }; + + const map = new Map(); + const authChainSet = new Set(); + for (const event of Object.values(state)) { + map.set(event.eventId, event); + } + + const store = getStore(map); + + for (const event of Object.values(state)) { + for (const eventId of await room.getAuthChain(event, store)) { + authChainSet.add(eventId); + } + } + + const authChain = Array.from(authChainSet.values()).map( + (eid) => map.get(eid)!, + ); + + return { + state, + authChain, + + missingEvents: [messageEvent, messageEvent2] as PersistentEventBase[], + }; + }, + ]); + const joinUser = async (roomId: string, userId: string) => { return _setUserMembership(roomId, userId, 'join'); }; @@ -1683,4 +2258,170 @@ describe('StateService', async () => { expect(event?.isAuthRejected()).toBeFalse(); } }); + + const label = (label: string, i: number) => { + return `[${i}] ${label}`; + }; + + for (let i = 1; i <= partialStateEvents.length; i++) { + describe(label('partial states', i), () => { + it(label('should not be able to complete the chain', i), async () => { + const { state } = await partialStateEvents[i - 1](); + const eventMap = new Map(); + const events = Object.values(state); + + for (const event of events) { + eventMap.set(event.eventId, event); + } + + const hasNoPartial = events.every((event) => + event.getPreviousEventIds().every((prev) => eventMap.has(prev)), + ); + + expect(hasNoPartial).toBeFalse(); + }); + + it(label('should be able to save partial states', i), async () => { + const { state, authChain } = await partialStateEvents[i - 1](); + const events = Object.values(state); + + const stateId = await stateService.processInitialState( + events.map((e) => e.event), + authChain.map((e) => e.event), + ); + + console.log(state.ourUserJoinEvent.eventId); + + expect(stateId).toBeString(); + + const event = await stateService.getEvent( + state.ourUserJoinEvent.eventId, + ); + expect(event?.isPartial()).toBeTrue(); + }); + + it(label( + 'should be able to save and detect partial states', + i, + ), async () => { + const { state, authChain } = await partialStateEvents[i - 1](); + const events = Object.values(state); + + await stateService.processInitialState( + events.map((e) => e.event), + authChain.map((e) => e.event), + ); + + expect( + stateService.isRoomStatePartial(events[0].roomId), + ).resolves.toBeTrue(); + }); + + it(label( + 'should complete the state as missing events get filled', + i, + ), async () => { + const { state, authChain, missingEvents } = + await partialStateEvents[i - 1](); + + await stateService.processInitialState( + Object.values(state).map((e) => e.event), + authChain.map((e) => e.event), + ); + + expect( + stateService.isRoomStatePartial(state.roomCreateEvent.roomId), + ).resolves.toBeTrue(); + + const eventStore = new Map(); + for (const e of (Object.values(state) as PersistentEventBase[]).concat( + missingEvents, + )) { + eventStore.set(e.eventId, e); + } + + const eventsToWalk = await stateService.getPartialEvents( + state.creatorMembershipEvent.roomId, + ); + + const store = stateService._getStore(state.roomCreateEvent.version); + + const remoteFetch = async (eventIds: EventID[]) => { + return eventIds.map((e) => eventStore.get(e)); + }; + + const walk = async (event: PersistentEventBase) => { + // for each previous event, walk + const previousEventsInStore = await store.getEvents( + event.getPreviousEventIds(), + ); + if ( + previousEventsInStore.length === event.getPreviousEventIds().length + ) { + console.log(`All previous events found in store ${event.eventId}`); + // start processing this event now + await stateService._resolveStateAtEvent(event); + return; + } + + const eventIdsToFind = [] as EventID[]; + for (const previousEventId of event.getPreviousEventIds()) { + if ( + !previousEventsInStore + .map((p) => p.eventId) + .includes(previousEventId) + ) { + eventIdsToFind.push(previousEventId); + } + } + + console.log(`Events to find ${eventIdsToFind}`); + + const previousEvents = (await remoteFetch( + eventIdsToFind, + )) as PersistentEventBase[]; + + expect(previousEvents.length).toBe( + event.getPreviousEventIds().length, + ); + + previousEvents + .sort((e1, e2) => { + if (e1.depth !== e2.depth) { + return e1.depth - e2.depth; + } + + if (e1.originServerTs !== e2.originServerTs) { + return e1.originServerTs - e2.originServerTs; + } + + return e1.eventId.localeCompare(e2.eventId); + }) + .reverse(); + + for (const previousEvent of previousEvents) { + console.log(`Waling ${previousEvent.eventId}`); + await walk(previousEvent); + } + + console.log( + `Finishing saving ${event.eventId}, all [${event.getPreviousEventIds().join(', ')}] events has been saved`, + ); + + // once all previous events have been walked we process this event + await stateService._resolveStateAtEvent(event); + }; + + for (const event of eventsToWalk) { + console.log(`Starting walking ${event.eventId}`); + await walk(event).catch(console.error); + } + + // now room should state to not be in partial state + expect( + stateService.isRoomStatePartial(state.roomCreateEvent.roomId), + ).resolves.toBeFalse(); + }); + }); + } }); diff --git a/packages/federation-sdk/src/services/state.service.ts b/packages/federation-sdk/src/services/state.service.ts index e26ce1cfa..15c498180 100644 --- a/packages/federation-sdk/src/services/state.service.ts +++ b/packages/federation-sdk/src/services/state.service.ts @@ -2,6 +2,7 @@ import { createLogger, signEvent } from '@rocket.chat/federation-core'; import { type EventID, type EventStore, + Pdu, type PduContent, PduCreateEventContent, PduForType, @@ -47,6 +48,19 @@ function yieldPairs(list: T[]): [T, T][] { return pairs; } +export class PartialStateResolutionError extends Error { + constructor(event: PersistentEventBase) { + const message = `Unable to process event, we don't have complete state yet (${event.toStrippedJson()})`; + super(message); + } +} + +export class UnknownRoomError extends Error { + constructor(roomId: RoomID) { + super(`Room ${roomId} does not exist`); + } +} + @singleton() export class StateService { private readonly logger = createLogger('StateService'); @@ -83,7 +97,7 @@ export class StateService { 'm.room.create', ); if (!createEvent) { - throw new Error('Create event not found for room version'); + throw new UnknownRoomError(roomId as RoomID); } return createEvent.event.content?.room_version as RoomVersion; @@ -122,6 +136,7 @@ export class StateService { event.eventId, event.event, stateId, + event.isPartial(), ); await this.updateNextEventReferencesWithEvent(event); @@ -189,6 +204,7 @@ export class StateService { const pdu = PersistentEventFactory.createFromRawEvent( event.event, roomVersion, + event.partial, ); if (event.rejectCode !== undefined) { @@ -290,13 +306,17 @@ export class StateService { event.roomId, ); + const events = [] as PersistentEventBase[]; + for (const prevEvent of prevEvents) { const e = PersistentEventFactory.createFromRawEvent( prevEvent.event, roomVersion, ); - event.addPrevEvents([e]); + events.push(e); } + + event.addPrevEvents(events); } public async signEvent(event: T) { @@ -368,15 +388,211 @@ export class StateService { ); } + // saves a full/partial state + // returns the final state id + async processInitialState(pdus: Pdu[], authChain: Pdu[]) { + const create = authChain.find((pdu) => pdu.type === 'm.room.create'); + if (create?.type !== 'm.room.create') { + throw new Error('No create event found in auth chain to save'); + } + + const version = create.content.room_version; + + // auth chain for whole state, if sorted by depth, should never have multiples with same branches + // this confirms correct sorting and being able to save with correct state for each + + // build the map first because .. ?? feels iterative now but makes sense ig + + const authChainCache = new Map(); + for (const pdu of authChain) { + const event = PersistentEventFactory.createFromRawEvent(pdu, version); + if (!authChainCache.has(event.eventId)) { + authChainCache.set(event.eventId, event); + } + } + + const eventCache = new Map(); + for (const pdu of pdus) { + const event = PersistentEventFactory.createFromRawEvent(pdu, version); + if (eventCache.has(event.eventId) || authChainCache.has(event.eventId)) { + continue; + } + eventCache.set(event.eventId, event); + } + + // handle create separately + const createEvent = PersistentEventFactory.createFromRawEvent( + create, + version, + ); + const stateId = await this.stateRepository.createDelta( + createEvent, + '' as StateID, + ); + await this.addToRoomGraph(createEvent, stateId); + + this.logger.info( + { eventId: createEvent.eventId, roomId: createEvent.roomId, stateId }, + 'create event saved', + ); + + const getAuthEventStateMap = (e: PersistentEventBase) => { + return e.getAuthEventIds().reduce((accum, curr) => { + // every event should have it's auth events in the auth chain + const event = authChainCache.get(curr); + if (event) { + accum.set(event.getUniqueStateIdentifier(), event); + } + return accum; + }, new Map()); + }; + + const store = this._getStore(version); + + const sortedEvents = Array.from(eventCache.values()) + .concat(Array.from(authChainCache.values())) + .sort((e1, e2) => { + if (e1.depth !== e2.depth) { + return e1.depth - e2.depth; + } + + if (e1.originServerTs !== e2.originServerTs) { + return e1.originServerTs - e2.originServerTs; + } + + return e1.eventId.localeCompare(e2.eventId); + }); + + let previousStateId = stateId; + + for (const event of sortedEvents) { + const authState = getAuthEventStateMap(event); + try { + await checkEventAuthWithState(event, authState, store); + } catch (error) { + this.logger.error( + { + eventId: event.eventId, + authEvents: event.getAuthEventIds(), + }, + 'event failed auth check while saving state, this should not have happened while walking an auth chain, the chain is incorrect', + ); + + // propagating throw, at this point this is not supposed to fail, something is wrong with the state we received + throw error; + } + + // auth events themseleves can be partial at any point + event.setPartial( + // if some of the previous events are partial this one also needs to be partial + event + .getPreviousEventIds() + .some((id) => { + const event = authChainCache.get(id) || eventCache.get(id); + // event notseen + if (!event) { + return true; + } + + // seen event is also partial + return event.isPartial(); + }), + ); + previousStateId = await this.stateRepository.createDelta( + event, + previousStateId, + ); + await this.addToRoomGraph(event, previousStateId); + } + + return previousStateId; + } + + private async _neeedsProcessing

( + event: P, + ): Promise

{ + const record = await this.eventRepository.findById(event.eventId); + if (record?.partial) { + // event is saved and is partial, pass it + event.setPartial(true); + return event; + } + + const previousEvents = await this.eventRepository + .findByIds(event.getPreviousEventIds()) + .toArray(); + if (previousEvents.length !== event.getPreviousEventIds().length) { + // if we don't have all the previous events now, this is a partial state + event.setPartial(true); + return event; + } + + if (previousEvents.some((e) => e.partial)) { + // if any of the previouseventsis partial this is too + event.setPartial(true); + return event; + } + + // isn't partial, check if already stored, then skip + if (record) { + return null; + } + + return event; + } + + async isRoomStatePartial(roomId: RoomID) { + const events = await this.eventRepository.findLatestEvents(roomId); + const stateIds = new Set(events.map((e) => e.stateId)); + switch (stateIds.size) { + case 0: + return false; + case 1: { + const stateId = stateIds.values().toArray().pop(); + const delta = + stateId && (await this.stateRepository.findOneById(stateId)); + if (!delta) { + throw new Error(`No delta found for ${stateId}`); + } + return delta.partial; + } + default: { + const deltas = await this.stateRepository.findByStateIds( + stateIds.values().toArray(), + ); + + for await (const delta of deltas) { + if (delta.partial) { + return true; + } + } + + return false; + } + } + } + // handle received pdu from transaction // implements spec:https://spec.matrix.org/v1.12/server-server-api/#checks-performed-on-receipt-of-a-pdu // TODO: this is not state related, can and should accept timeline events too, move to event service? - async handlePdu(event: PersistentEventBase): Promise { - const exists = await this.eventRepository.findById(event.eventId); - if (exists) { + async handlePdu

(pdu: P): Promise { + if (pdu.isCreateEvent()) { + this.logger.debug({ eventId: pdu.eventId }, 'handling create event'); + const stateId = await this.stateRepository.createDelta( + pdu, + '' as StateID, + ); + + await this.addToRoomGraph(pdu, stateId); + + return; + } + + const event = await this._neeedsProcessing(pdu); + if (!event) { this.logger.debug( - { eventId: event.eventId }, - 'event exists but attempted to persist again, should not happen', + { eventId: pdu.eventId }, + 'event saved and not in partial state, skipping processing', ); return; } @@ -386,19 +602,11 @@ export class StateService { 'handling pdu', ); - // handle create events separately - if (event.isCreateEvent()) { - this.logger.debug({ eventId: event.eventId }, 'handling create event'); - const stateId = await this.stateRepository.createDelta( - event, - '' as StateID, - ); - - await this.addToRoomGraph(event, stateId); - - return; + if (await this.isRoomStatePartial(event.roomId)) { + throw new PartialStateResolutionError(event); } + // handle create events separately // TODO: 1. Is a valid event, otherwise it is dropped. For an event to be valid, it must contain a room_id, and it must comply with the event format of that room version. // 2. Passes signature checks, otherwise it is dropped. // ^ done someplace else. move here? TODO: @@ -566,9 +774,7 @@ export class StateService { // const stateId = await this.getStateIdBeforeEvent(event); // return this.getStateAtStateId(stateId, event.version); // } - - // TODO: i think moving this to repo will help with schema change diff - async getServersInRoom(roomId: string) { + async getServerSetInRoom(roomId: string) { const state = await this.getLatestRoomState(roomId); const servers = new Set(); @@ -591,7 +797,12 @@ export class StateService { } } - return Array.from(servers); + return servers; + } + + // @deprecated use getServerSetInRoom + async getServersInRoom(roomId: string) { + return Array.from(await this.getServerSetInRoom(roomId)); } private async _isSameChain(stateIds: StateID[]) { @@ -722,7 +933,17 @@ export class StateService { }, 'previous events', ); - throw new Error(`no state at event ${event.eventId}`); + throw new Error(`no previous state for event ${event.eventId}`); + } + + if (event.isPartial()) { + // walked over to this, since we have the state at this event, toggle event to be not partial any longer + this.logger.debug( + { eventId: event.eventId }, + 'completing state at event', + ); + // previous states by this point should NOT be partial + event.setPartial(!event.isPartial()); } // different stateids, may need to run state resolution @@ -833,6 +1054,10 @@ export class StateService { ? roomVersion_ : await this.getRoomVersion(roomId); const records = await this.eventRepository.findLatestEvents(roomId); + this.logger.debug( + { roomId, events: records.map((r) => r._id) }, + 'current latest events', + ); const stateIds = new Set(); for (const record of records) { stateIds.add(record.stateId); @@ -933,4 +1158,18 @@ export class StateService { return result; } + + async getPartialEvents(roomId: RoomID) { + const roomVersion = await this.getRoomVersion(roomId); + return this.eventRepository + .findPartialsByRoomId(roomId) + .map((rec) => + PersistentEventFactory.createFromRawEvent( + rec.event, + roomVersion, + rec.partial, + ), + ) + .toArray(); + } } diff --git a/packages/room/src/manager/event-wrapper.ts b/packages/room/src/manager/event-wrapper.ts index c97c0b053..d91d6f092 100644 --- a/packages/room/src/manager/event-wrapper.ts +++ b/packages/room/src/manager/event-wrapper.ts @@ -65,6 +65,7 @@ export abstract class PersistentEventBase< constructor( event: PduWithHashesAndSignaturesOptional, public readonly version: Version, + private partial = false, ) { this.rawEvent = JSON.parse(JSON.stringify(event)); if (this.rawEvent.signatures) { @@ -75,7 +76,7 @@ export abstract class PersistentEventBase< this.authEventsIds.add(id); } } - if (this.rawEvent.prev_events) { + if (this.rawEvent.prev_events?.length) { for (const id of this.rawEvent.prev_events) { this.prevEventsIds.add(id); } @@ -442,6 +443,14 @@ export abstract class PersistentEventBase< if (rejectedBy) this.rejectedBy = rejectedBy; } + isPartial() { + return this.partial; + } + + setPartial(partial: boolean) { + this.partial = partial; + } + addPrevEvents(events: PersistentEventBase[]) { for (const event of events) { this.prevEventsIds.add(event.eventId); diff --git a/packages/room/src/manager/factory.ts b/packages/room/src/manager/factory.ts index d0e6d21fc..216bdf7f9 100644 --- a/packages/room/src/manager/factory.ts +++ b/packages/room/src/manager/factory.ts @@ -53,6 +53,7 @@ export class PersistentEventFactory { static createFromRawEvent( event: PduWithHashesAndSignaturesOptional, roomVersion: string, + partial = false, ): PersistentEventBase { if (!PersistentEventFactory.isSupportedRoomVersion(roomVersion)) { throw new Error(`Room version ${roomVersion} is not supported`); @@ -62,17 +63,17 @@ export class PersistentEventFactory { case '3': case '4': case '5': - return new PersistentEventV3(event, roomVersion); + return new PersistentEventV3(event, roomVersion, partial); case '6': case '7': - return new PersistentEventV6(event, roomVersion); + return new PersistentEventV6(event, roomVersion, partial); case '8': - return new PersistentEventV8(event, roomVersion); + return new PersistentEventV8(event, roomVersion, partial); case '9': case '10': - return new PersistentEventV9(event, roomVersion); + return new PersistentEventV9(event, roomVersion, partial); case '11': - return new PersistentEventV11(event, roomVersion); + return new PersistentEventV11(event, roomVersion, partial); default: throw new Error(`Unknown room version: ${roomVersion}`); } @@ -104,7 +105,7 @@ export class PersistentEventFactory { room_id: roomId, prev_events: [], auth_events: [], - depth: 0, + depth: 1, }; return PersistentEventFactory.createFromRawEvent<'m.room.create'>(