Skip to content

Commit

Permalink
Fan out properly
Browse files Browse the repository at this point in the history
Fixes #20
  • Loading branch information
turt2live committed Mar 31, 2023
1 parent cfbcb37 commit ce698cc
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 9 deletions.
4 changes: 3 additions & 1 deletion src/server/FederationClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export class FederationClient {
if (res.status !== 200) {
throw new Error("Failed to send invite to server: " + (await res.text()));
}
await res.text(); // consume response
}

public async sendEvents(events: MatrixEvent[]): Promise<void> {
Expand All @@ -55,7 +56,8 @@ export class FederationClient {
},
});
if (res.status !== 200) {
throw new Error("Failed to send invite to server: " + (await res.text()));
throw new Error("Failed to send events to server: " + (await res.text()));
}
await res.text(); // consume response
}
}
21 changes: 16 additions & 5 deletions src/server/FederationServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,36 @@ export class FederationServer {
continue;
}

const room = this.roomStore.getRoom(roomId);
let room = this.roomStore.getRoom(roomId);
if (!room) {
rejected.push(event);
continue;
if (event["type"] === "m.room.create") {
room = await Room.createRoomFromCreateEvent(event, this.keyStore);
}
if (!room) {
rejected.push(event);
continue;
} else {
this.roomStore.addRoom(room);
this.csApi.sendEventsToClients(room, [event]);
continue; // don't try to add the m.room.create event to the room
}
}

// TODO: Ensure we route invites correctly - https://github.com/matrix-org/linearized-matrix/issues/19

try {
await room.sendEvent(event);
await room.sendEvent(event, true);
this.csApi.sendEventsToClients(room, [event]);
// TODO: Fan out - https://github.com/matrix-org/linearized-matrix/issues/20
} catch (e) {
console.error(e);
rejected.push(event);
return res.status(500).json({
errcode: "M_UNKNOWN",
error: `${e && typeof e === "object" ? (e as any).message ?? `${e}` : e}`,
});
}
}

res.json({});
}
}
4 changes: 4 additions & 0 deletions src/server/RoomStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@ export class RoomStore {
public getRoom(roomId: string): Room | undefined {
return this.rooms.find(r => r.roomId === roomId);
}

public addRoom(room: Room) {
this.rooms.push(room);
}
}
10 changes: 8 additions & 2 deletions src/server/client_server_api/ClientServerApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,15 @@ export class ClientServerApi {
content: packet.content,
});
try {
await room.sendEvent(event);
this.sendToClients(room, {type: PacketType.Event, event: event} as EventPacket);
if (room.ownerDomain !== Runtime.signingKey.serverName) {
const federation = new FederationClient(room.ownerDomain);
await federation.sendEvents([event]);
} else {
await room.sendEvent(event, true);
this.sendToClients(room, {type: PacketType.Event, event: event} as EventPacket);
}
} catch (e) {
console.error(e);
this.sendToClient(client, {
type: PacketType.Error,
message: (e as Error)?.message ?? "Unknown error",
Expand Down
73 changes: 72 additions & 1 deletion src/server/models/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {DefaultRoomVersion, getRoomVersionImpl} from "../room_versions/map";
import {calculateContentHash} from "../util/hashing";
import {Runtime} from "../Runtime";
import {KeyStore} from "../KeyStore";
import {FederationClient} from "../FederationClient";

export class Room {
private events: MatrixEvent[] = [];
Expand Down Expand Up @@ -57,11 +58,69 @@ export class Room {
*
* Operates asynchronously.
* @param event The event to send
* @param fanout True to send the event over federation if the current
* server is the room owner. False otherwise (default).
*/
public async sendEvent(event: MatrixEvent): Promise<void> {
public async sendEvent(event: MatrixEvent, fanout = false): Promise<void> {
const remote = getDomainFromId(event.sender);
if (
remote != Runtime.signingKey.serverName &&
event.owner_server === this.ownerDomain &&
this.ownerDomain === Runtime.signingKey.serverName
) {
// We need to sign this too, as owners
const redacted = this.roomVersion.redact(event);
const signed = Runtime.signingKey.signJson(redacted);
event = {...event, signatures: signed.signatures};
}

await this.roomVersion.checkValidity(event, this.keyStore);
this.roomVersion.checkAuth(event, this.events);
this.events.push(event);

if (fanout && this.ownerDomain === Runtime.signingKey.serverName) {
const joinedMembers = this.currentState
.getAll("m.room.member")
.filter(m => m.content["membership"] === "join");
const joinedServers = new Set(joinedMembers.map(m => getDomainFromId(m.state_key!)));
joinedServers.delete(this.ownerDomain); // we don't want to send to ourselves

if (
event.type === "m.room.member" &&
event.content["membership"] === "join" &&
remote !== this.ownerDomain
) {
// a server might have just joined - try to find the previous membership event
let prevEvent: MatrixEvent | undefined;
for (const pEvent of this.events) {
if (pEvent.type === "m.room.member" && pEvent.state_key === event.state_key && pEvent !== event) {
prevEvent = pEvent;
}
}
// XXX: This check assumes there's only ever 1 user from each server, so we'd end up
// sending the create event (and others) multiple times if there were multiple users.
if (!prevEvent || prevEvent.content["membership"] === "invite") {
try {
console.log(`Sharing history with ${remote} because they just joined the room`);
// TODO: History visibility - https://github.com/matrix-org/linearized-matrix/issues/21
const federation = new FederationClient(remote);
await federation.sendEvents(this.events.filter(e => e !== event)); // we'll send the current event in a moment
} catch (e) {
console.error(e);
}
}
}

for (const domain of joinedServers) {
try {
console.log(`Sending ${event.type} to ${domain}`);
const federation = new FederationClient(domain);
await federation.sendEvents([event]);
} catch (e) {
console.error(e);
}
}
}
}

public createEventFrom(
Expand Down Expand Up @@ -114,6 +173,18 @@ export class Room {
return membershipEvent;
}

public static async createRoomFromCreateEvent(event: MatrixEvent, keyStore: KeyStore): Promise<Room | undefined> {
const version = event["content"]?.["room_version"];
const impl = getRoomVersionImpl(version);
if (!impl) {
return undefined;
}

const room = new Room(event["room_id"], impl, keyStore);
await room.sendEvent(event);
return room;
}

public static async createRoomForRemoteJoin(
userId: string,
roomId: string,
Expand Down

0 comments on commit ce698cc

Please sign in to comment.