Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load Thread List with server-side assistance (MSC3856) #2602

Merged
merged 18 commits into from
Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 339 additions & 11 deletions spec/integ/matrix-client-event-timeline.spec.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions spec/unit/room.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { UNSTABLE_ELEMENT_FUNCTIONAL_USERS } from "../../src/@types/event";
import { TestClient } from "../TestClient";
import { emitPromise } from "../test-utils/test-utils";
import { ReceiptType } from "../../src/@types/read_receipts";
import { Thread, ThreadEvent } from "../../src/models/thread";
import { FeatureSupport, Thread, ThreadEvent } from "../../src/models/thread";
import { WrappedReceipt } from "../../src/models/read-receipt";

describe("Room", function() {
Expand Down Expand Up @@ -2408,7 +2408,7 @@ describe("Room", function() {
});

it("should aggregate relations in thread event timeline set", () => {
Thread.setServerSideSupport(true, true);
Thread.setServerSideSupport(FeatureSupport.Stable);
const threadRoot = mkMessage();
const rootReaction = mkReaction(threadRoot);
const threadResponse = mkThreadResponse(threadRoot);
Expand Down
216 changes: 173 additions & 43 deletions src/client.ts

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions src/models/event-timeline-set.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,15 @@ export class EventTimelineSet extends TypedEventEmitter<EmittedEvents, EventTime
* @param {MatrixClient=} client the Matrix client which owns this EventTimelineSet,
* can be omitted if room is specified.
* @param {Thread=} thread the thread to which this timeline set relates.
* @param {boolean} isThreadTimeline Whether this timeline set relates to a thread list timeline
* (e.g., All threads or My threads)
*/
constructor(
public readonly room: Room | undefined,
opts: IOpts = {},
client?: MatrixClient,
public readonly thread?: Thread,
public readonly isThreadTimeline: boolean = false,
) {
super();

Expand Down
4 changes: 2 additions & 2 deletions src/models/event-timeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class EventTimeline {
private endState: RoomState;
private prevTimeline?: EventTimeline;
private nextTimeline?: EventTimeline;
public paginationRequests: Record<Direction, Promise<boolean>> = {
public paginationRequests: Record<Direction, Promise<boolean> | null> = {
[Direction.Backward]: null,
[Direction.Forward]: null,
};
Expand Down Expand Up @@ -311,7 +311,7 @@ export class EventTimeline {
* token for going backwards in time; EventTimeline.FORWARDS to set the
* pagination token for going forwards in time.
*/
public setPaginationToken(token: string, direction: Direction): void {
public setPaginationToken(token: string | null, direction: Direction): void {
this.getState(direction).paginationToken = token;
}

Expand Down
2 changes: 1 addition & 1 deletion src/models/room-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class RoomState extends TypedEventEmitter<EmittedEvents, EventHandlerMap>
// XXX: Should be read-only
public members: Record<string, RoomMember> = {}; // userId: RoomMember
public events = new Map<string, Map<string, MatrixEvent>>(); // Map<eventType, Map<stateKey, MatrixEvent>>
public paginationToken: string = null;
public paginationToken: string | null = null;

public readonly beacons = new Map<BeaconIdentifier, Beacon>();
private _liveBeaconIds: BeaconIdentifier[] = [];
Expand Down
187 changes: 132 additions & 55 deletions src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
}

private threadTimelineSetsPromise: Promise<[EventTimelineSet, EventTimelineSet]> | null = null;
public async createThreadsTimelineSets(): Promise<[EventTimelineSet, EventTimelineSet]> {
public async createThreadsTimelineSets(): Promise<[EventTimelineSet, EventTimelineSet] | null> {
if (this.threadTimelineSetsPromise) {
return this.threadTimelineSetsPromise;
}
Expand All @@ -372,10 +372,13 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
]);
const timelineSets = await this.threadTimelineSetsPromise;
this.threadsTimelineSets.push(...timelineSets);
return timelineSets;
} catch (e) {
this.threadTimelineSetsPromise = null;
return null;
}
}
return null;
}

/**
Expand Down Expand Up @@ -1612,7 +1615,14 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {

private async createThreadTimelineSet(filterType?: ThreadFilterType): Promise<EventTimelineSet> {
let timelineSet: EventTimelineSet;
if (Thread.hasServerSideSupport) {
if (Thread.hasServerSideListSupport) {
timelineSet =
new EventTimelineSet(this, this.opts, undefined, undefined, Boolean(Thread.hasServerSideListSupport));
this.reEmitter.reEmit(timelineSet, [
RoomEvent.Timeline,
RoomEvent.TimelineReset,
]);
} else if (Thread.hasServerSideSupport) {
const filter = await this.getThreadListFilter(filterType);

timelineSet = this.getOrCreateFilteredTimelineSet(
Expand Down Expand Up @@ -1645,81 +1655,148 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
return timelineSet;
}

public threadsReady = false;
private threadsReady = false;

/**
* Takes the given thread root events and creates threads for them.
* @param events
* @param toStartOfTimeline
*/
public processThreadRoots(events: MatrixEvent[], toStartOfTimeline: boolean): void {
for (const rootEvent of events) {
EventTimeline.setEventMetadata(
rootEvent,
this.currentState,
toStartOfTimeline,
);
if (!this.getThread(rootEvent.getId())) {
this.createThread(rootEvent.getId(), rootEvent, [], toStartOfTimeline);
}
}
}

/**
* Fetch the bare minimum of room threads required for the thread list to work reliably.
* With server support that means fetching one page.
* Without server support that means fetching as much at once as the server allows us to.
*/
public async fetchRoomThreads(): Promise<void> {
if (this.threadsReady || !this.client.supportsExperimentalThreads()) {
return;
}

const allThreadsFilter = await this.getThreadListFilter();

const { chunk: events } = await this.client.createMessagesRequest(
this.roomId,
"",
Number.MAX_SAFE_INTEGER,
Direction.Backward,
allThreadsFilter,
);

if (!events.length) return;

// Sorted by last_reply origin_server_ts
const threadRoots = events
.map(this.client.getEventMapper())
.sort((eventA, eventB) => {
/**
* `origin_server_ts` in a decentralised world is far from ideal
* but for lack of any better, we will have to use this
* Long term the sorting should be handled by homeservers and this
* is only meant as a short term patch
*/
const threadAMetadata = eventA
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
const threadBMetadata = eventB
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
return threadAMetadata.latest_event.origin_server_ts - threadBMetadata.latest_event.origin_server_ts;
});
if (Thread.hasServerSideListSupport) {
await Promise.all([
this.fetchRoomThreadList(ThreadFilterType.All),
this.fetchRoomThreadList(ThreadFilterType.My),
]);
} else {
const allThreadsFilter = await this.getThreadListFilter();

const { chunk: events } = await this.client.createMessagesRequest(
this.roomId,
"",
Number.MAX_SAFE_INTEGER,
Direction.Backward,
allThreadsFilter,
);

let latestMyThreadsRootEvent: MatrixEvent;
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const rootEvent of threadRoots) {
this.threadsTimelineSets[0].addLiveEvent(rootEvent, {
duplicateStrategy: DuplicateStrategy.Ignore,
fromCache: false,
roomState,
});
if (!events.length) return;

// Sorted by last_reply origin_server_ts
const threadRoots = events
.map(this.client.getEventMapper())
.sort((eventA, eventB) => {
/**
* `origin_server_ts` in a decentralised world is far from ideal
* but for lack of any better, we will have to use this
* Long term the sorting should be handled by homeservers and this
* is only meant as a short term patch
*/
const threadAMetadata = eventA
.getServerAggregatedRelation<IThreadBundledRelationship>(THREAD_RELATION_TYPE.name);
const threadBMetadata = eventB
.getServerAggregatedRelation<IThreadBundledRelationship>(THREAD_RELATION_TYPE.name);
return threadAMetadata.latest_event.origin_server_ts -
threadBMetadata.latest_event.origin_server_ts;
});

const threadRelationship = rootEvent
.getServerAggregatedRelation<IThreadBundledRelationship>(RelationType.Thread);
if (threadRelationship.current_user_participated) {
this.threadsTimelineSets[1].addLiveEvent(rootEvent, {
let latestMyThreadsRootEvent: MatrixEvent;
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const rootEvent of threadRoots) {
this.threadsTimelineSets[0].addLiveEvent(rootEvent, {
duplicateStrategy: DuplicateStrategy.Ignore,
fromCache: false,
roomState,
});
latestMyThreadsRootEvent = rootEvent;
}

if (!this.getThread(rootEvent.getId())) {
this.createThread(rootEvent.getId(), rootEvent, [], true);
const threadRelationship = rootEvent
.getServerAggregatedRelation<IThreadBundledRelationship>(THREAD_RELATION_TYPE.name);
if (threadRelationship.current_user_participated) {
this.threadsTimelineSets[1].addLiveEvent(rootEvent, {
duplicateStrategy: DuplicateStrategy.Ignore,
fromCache: false,
roomState,
});
latestMyThreadsRootEvent = rootEvent;
}
}
}

this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]);
if (latestMyThreadsRootEvent) {
this.client.decryptEventIfNeeded(latestMyThreadsRootEvent);
this.processThreadRoots(threadRoots, true);

this.client.decryptEventIfNeeded(threadRoots[threadRoots.length -1]);
if (latestMyThreadsRootEvent) {
this.client.decryptEventIfNeeded(latestMyThreadsRootEvent);
}
}

this.on(ThreadEvent.NewReply, this.onThreadNewReply);
this.threadsReady = true;
}

this.on(ThreadEvent.NewReply, this.onThreadNewReply);
/**
* Fetch a single page of threadlist messages for the specific thread filter
* @param filter
* @private
*/
private async fetchRoomThreadList(filter?: ThreadFilterType): Promise<void> {
const timelineSet = filter === ThreadFilterType.My
? this.threadsTimelineSets[1]
: this.threadsTimelineSets[0];

const { chunk: events, end } = await this.client.createThreadListMessagesRequest(
this.roomId,
null,
undefined,
Direction.Backward,
timelineSet.getFilter(),
);

timelineSet.getLiveTimeline().setPaginationToken(end, Direction.Backward);

if (!events.length) return;

const matrixEvents = events.map(this.client.getEventMapper());
this.processThreadRoots(matrixEvents, true);
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const rootEvent of matrixEvents) {
timelineSet.addLiveEvent(rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState,
});
}
}

private onThreadNewReply(thread: Thread): void {
const roomState = this.getLiveTimeline().getState(EventTimeline.FORWARDS);
for (const timelineSet of this.threadsTimelineSets) {
timelineSet.removeEvent(thread.id);
timelineSet.addLiveEvent(thread.rootEvent);
timelineSet.addLiveEvent(thread.rootEvent, {
duplicateStrategy: DuplicateStrategy.Replace,
fromCache: false,
roomState,
});
}
}

Expand Down Expand Up @@ -1865,8 +1942,6 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
this.lastThread = thread;
}

this.emit(ThreadEvent.New, thread, toStartOfTimeline);

if (this.threadsReady) {
this.threadsTimelineSets.forEach(timelineSet => {
if (thread.rootEvent) {
Expand All @@ -1883,6 +1958,8 @@ export class Room extends ReadReceipt<EmittedEvents, RoomEventHandlerMap> {
});
}

this.emit(ThreadEvent.New, thread, toStartOfTimeline);

return thread;
}

Expand Down
33 changes: 29 additions & 4 deletions src/models/thread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,28 @@ interface IThreadOpts {
client: MatrixClient;
}

export enum FeatureSupport {
None = 0,
Experimental = 1,
Stable = 2
}

export function determineFeatureSupport(stable: boolean, unstable: boolean): FeatureSupport {
if (stable) {
return FeatureSupport.Stable;
} else if (unstable) {
return FeatureSupport.Experimental;
} else {
return FeatureSupport.None;
}
}

/**
* @experimental
*/
export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
public static hasServerSideSupport: boolean;
public static hasServerSideSupport = FeatureSupport.None;
public static hasServerSideListSupport = FeatureSupport.None;

/**
* A reference to all the events ID at the bottom of the threads
Expand Down Expand Up @@ -134,15 +151,23 @@ export class Thread extends ReadReceipt<EmittedEvents, EventHandlerMap> {
this.emit(ThreadEvent.Update, this);
}

public static setServerSideSupport(hasServerSideSupport: boolean, useStable: boolean): void {
Thread.hasServerSideSupport = hasServerSideSupport;
if (!useStable) {
public static setServerSideSupport(
status: FeatureSupport,
): void {
Thread.hasServerSideSupport = status;
if (status !== FeatureSupport.Stable) {
FILTER_RELATED_BY_SENDERS.setPreferUnstable(true);
FILTER_RELATED_BY_REL_TYPES.setPreferUnstable(true);
THREAD_RELATION_TYPE.setPreferUnstable(true);
}
}

public static setServerSideListSupport(
status: FeatureSupport,
): void {
Thread.hasServerSideListSupport = status;
}

private onBeforeRedaction = (event: MatrixEvent, redaction: MatrixEvent) => {
if (event?.isRelation(THREAD_RELATION_TYPE.name) &&
this.room.eventShouldLiveIn(event).threadId === this.id &&
Expand Down