diff --git a/apps/meteor/app/ui-utils/client/lib/LegacyRoomManager.ts b/apps/meteor/app/ui-utils/client/lib/LegacyRoomManager.ts index b53715599480f..f00d53cddf46e 100644 --- a/apps/meteor/app/ui-utils/client/lib/LegacyRoomManager.ts +++ b/apps/meteor/app/ui-utils/client/lib/LegacyRoomManager.ts @@ -1,48 +1,47 @@ import type { IMessage, IRoom } from '@rocket.chat/core-typings'; +import { Emitter } from '@rocket.chat/emitter'; import { createPredicateFromFilter } from '@rocket.chat/mongo-adapter'; import type { Filter } from '@rocket.chat/mongo-adapter'; -import { ReactiveVar } from 'meteor/reactive-var'; -import { Tracker } from 'meteor/tracker'; import { upsertMessage, RoomHistoryManager } from './RoomHistoryManager'; -import { mainReady } from './mainReady'; import { RoomManager } from '../../../../client/lib/RoomManager'; import { roomCoordinator } from '../../../../client/lib/rooms/roomCoordinator'; import { fireGlobalEvent } from '../../../../client/lib/utils/fireGlobalEvent'; import { getConfig } from '../../../../client/lib/utils/getConfig'; import { callbacks } from '../../../../lib/callbacks'; -import { Messages, Subscriptions, CachedChatSubscription } from '../../../models/client'; +import { Messages, Subscriptions } from '../../../models/client'; import { sdk } from '../../../utils/client/lib/SDKClient'; const maxRoomsOpen = parseInt(getConfig('maxRoomsOpen') ?? '5') || 5; -const openedRooms: Record< - string, - { - typeName: string; - rid: IRoom['_id']; - ready: boolean; - active: boolean; - dom?: Node; - streamActive?: boolean; - unreadSince: ReactiveVar; - lastSeen: Date; - unreadFirstId?: string; - } -> = {}; +type ListenRoomPropsByRidProps = keyof OpenedRoom; +type ListenRoomPropsByRidPropsEvent = `${string}/${ListenRoomPropsByRidProps}`; + +const listener = new Emitter<{ + [key in ListenRoomPropsByRidPropsEvent]: undefined; +}>(); + +type OpenedRoom = { + typeName: string; + rid: IRoom['_id']; + ready: boolean; + dom?: Node; + streamActive?: boolean; + unreadSince: Date | undefined; + lastSeen: Date; + unreadFirstId?: string; + stream?: { + stop: () => void; + }; +}; -const openedRoomsDependency = new Tracker.Dependency(); +const openedRooms: Record = {}; function close(typeName: string) { if (openedRooms[typeName]) { - if (openedRooms[typeName].rid) { - sdk.stop('room-messages', openedRooms[typeName].rid); - sdk.stop('notify-room', `${openedRooms[typeName].rid}/deleteMessage`); - sdk.stop('notify-room', `${openedRooms[typeName].rid}/deleteMessageBulk`); - } + openedRooms[typeName].stream?.stop(); openedRooms[typeName].ready = false; - openedRooms[typeName].active = false; delete openedRooms[typeName].dom; @@ -72,152 +71,187 @@ async function closeAllRooms() { } } +function listenRoomPropsByRid( + rid: IRoom['_id'], + prop: T, +): { + subscribe: (cb: () => void) => () => void; + getSnapshotValue: () => OpenedRoom[T]; +} { + return { + subscribe: (cb: () => void) => { + return listener.on(`${rid}/${prop}`, cb); + }, + getSnapshotValue: (): OpenedRoom[T] => { + return getOpenedRoomByRid(rid)?.[prop] as OpenedRoom[T]; + }, + }; +} + +function setPropertyByRid(room: OpenedRoom, prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; +function setPropertyByRid(rid: IRoom['_id'], prop: T, value: OpenedRoom[T]): OpenedRoom[T] | undefined; +function setPropertyByRid( + ridOrRoom: IRoom['_id'] | OpenedRoom, + prop: T, + value: OpenedRoom[T], +): OpenedRoom[T] | undefined { + const room = typeof ridOrRoom === 'string' ? getOpenedRoomByRid(ridOrRoom) : ridOrRoom; + const rid = typeof ridOrRoom === 'string' ? ridOrRoom : room?.rid; + if (!room) { + return; + } + room[prop] = value; + listener.emit(`${rid}/${prop}`); +} + function getOpenedRoomByRid(rid: IRoom['_id']) { - openedRoomsDependency.depend(); return Object.keys(openedRooms) .map((typeName) => openedRooms[typeName]) .find((openedRoom) => openedRoom.rid === rid); } -const computation = Tracker.autorun(() => { - if (!mainReady.get()) { +const openRoom = (typeName: string, record: OpenedRoom) => { + if (record.ready === true && record.streamActive === true) { return; } - Tracker.nonreactive(() => - Object.entries(openedRooms).forEach(([typeName, record]) => { - if (record.active !== true || (record.ready === true && record.streamActive === true)) { - return; - } - - const type = typeName.slice(0, 1); - const name = typeName.slice(1); - - const room = roomCoordinator.getRoomDirectives(type).findRoom(name); - - if (room) { - if (record.streamActive !== true) { - void sdk - .stream('room-messages', [record.rid], async (msg) => { - // Should not send message to room if room has not loaded all the current messages - // if (RoomHistoryManager.hasMoreNext(record.rid) !== false) { - // return; - // } - // Do not load command messages into channel - if (msg.t !== 'command') { - const subscription = Subscriptions.findOne({ rid: record.rid }, { reactive: false }); - const isNew = !Messages.state.find((record) => record._id === msg._id && record.temp !== true); - ({ _id: msg._id, temp: { $ne: true } }); - await upsertMessage({ msg, subscription }); - - if (isNew) { - await callbacks.run('streamNewMessage', msg); - } - } - - await callbacks.run('streamMessage', { ...msg, name: room.name || '' }); - - fireGlobalEvent('new-message', { - ...msg, - name: room.name || '', - room: { - type, - name, - }, - }); - }) - - .ready() - .then(() => { - record.streamActive = true; - openedRoomsDependency.changed(); - }); - - // when we receive a messages imported event we just clear the room history and fetch it again - sdk.stream('notify-room', [`${record.rid}/messagesImported`], async () => { - await RoomHistoryManager.clear(record.rid); - await RoomHistoryManager.getMore(record.rid); - }); - - sdk.stream('notify-room', [`${record.rid}/deleteMessage`], (msg) => { - Messages.state.delete(msg._id); - - // remove thread refenrece from deleted message - Messages.state.update( - (record) => record.tmid === msg._id, - ({ tmid: _, ...record }) => record, - ); - }); - - sdk.stream( - 'notify-room', - [`${record.rid}/deleteMessageBulk`], - ({ rid, ts, excludePinned, ignoreDiscussion, users, ids, showDeletedStatus }) => { - const query: Filter = { rid }; - - if (ids) { - query._id = { $in: ids }; - } else { - query.ts = ts; - } - if (excludePinned) { - query.pinned = { $ne: true }; - } - if (ignoreDiscussion) { - query.drid = { $exists: false }; - } - if (users?.length) { - query['u.username'] = { $in: users }; - } - - const predicate = createPredicateFromFilter(query); - - if (showDeletedStatus) { - return Messages.state.update(predicate, (record) => ({ - ...record, - t: 'rm', - msg: '', - urls: [], - mentions: [], - attachments: [], - reactions: {}, - })); - } - - return Messages.state.remove(predicate); - }, - ); - sdk.stream('notify-room', [`${record.rid}/messagesRead`], ({ tmid, until }) => { - if (tmid) { - Messages.state.update( - (record) => record.tmid === tmid && record.unread === true, - ({ unread: _, ...record }) => record, - ); - return; - } - Messages.state.update( - (r) => - r.rid === record.rid && r.unread === true && r.ts.getTime() < until.getTime() && (r.tmid === undefined || r.tshow === true), - ({ unread: _, ...r }) => r, - ); - }); + if (record.streamActive === true) { + return; + } + + const type = typeName.slice(0, 1); + const name = typeName.slice(1); + + const room = roomCoordinator.getRoomDirectives(type).findRoom(name); + + if (!room) { + return; + } + + const streams: ReturnType[] = []; + + streams.push( + ...[ + sdk.stream('room-messages', [record.rid], async (msg) => { + // Should not send message to room if room has not loaded all the current messages + // if (RoomHistoryManager.hasMoreNext(record.rid) !== false) { + // return; + // } + // Do not load command messages into channel + if (msg.t !== 'command') { + const subscription = Subscriptions.findOne({ rid: record.rid }, { reactive: false }); + const isNew = !Messages.state.find((record) => record._id === msg._id && record.temp !== true); + ({ _id: msg._id, temp: { $ne: true } }); + await upsertMessage({ msg, subscription }); + if (isNew) { + await callbacks.run('streamNewMessage', msg); + } } - } - record.ready = true; - }), + await callbacks.run('streamMessage', { ...msg, name: room.name || '' }); + + fireGlobalEvent('new-message', { + ...msg, + name: room.name || '', + room: { + type, + name, + }, + }); + }), + + // when we receive a messages imported event we just clear the room history and fetch it again + sdk.stream('notify-room', [`${record.rid}/messagesImported`], async () => { + await RoomHistoryManager.clear(record.rid); + await RoomHistoryManager.getMore(record.rid); + }), + sdk.stream('notify-room', [`${record.rid}/deleteMessage`], (msg) => { + Messages.state.delete(msg._id); + + // remove thread refenrece from deleted message + Messages.state.update( + (record) => record.tmid === msg._id, + ({ tmid: _, ...record }) => record, + ); + }), + sdk.stream( + 'notify-room', + [`${record.rid}/deleteMessageBulk`], + ({ rid, ts, excludePinned, ignoreDiscussion, users, ids, showDeletedStatus }) => { + const query: Filter = { rid }; + + if (ids) { + query._id = { $in: ids }; + } else { + query.ts = ts; + } + if (excludePinned) { + query.pinned = { $ne: true }; + } + if (ignoreDiscussion) { + query.drid = { $exists: false }; + } + if (users?.length) { + query['u.username'] = { $in: users }; + } + + const predicate = createPredicateFromFilter(query); + + if (showDeletedStatus) { + return Messages.state.update(predicate, (record) => ({ + ...record, + t: 'rm', + msg: '', + urls: [], + mentions: [], + attachments: [], + reactions: {}, + })); + } + + return Messages.state.remove(predicate); + }, + ), + + sdk.stream('notify-room', [`${record.rid}/messagesRead`], ({ tmid, until }) => { + if (tmid) { + Messages.state.update( + (record) => record.tmid === tmid && record.unread === true, + ({ unread: _, ...record }) => record, + ); + return; + } + Messages.state.update( + (r) => + r.rid === record.rid && r.unread === true && r.ts.getTime() < until.getTime() && (r.tmid === undefined || r.tshow === true), + ({ unread: _, ...r }) => r, + ); + }), + ], ); - openedRoomsDependency.changed(); -}); + + const [streamRoomMessages] = streams; + + void streamRoomMessages.ready().then(() => { + setPropertyByRid(record.rid, 'streamActive', true); + }); + + record.stream = { + stop: () => { + streams.forEach((stream) => stream.stop()); + }, + }; + + record.ready = true; +}; function open({ typeName, rid }: { typeName: string; rid: IRoom['_id'] }) { if (!openedRooms[typeName]) { openedRooms[typeName] = { typeName, rid, - active: false, ready: false, - unreadSince: new ReactiveVar(undefined), + unreadSince: undefined, lastSeen: new Date(), }; } @@ -228,21 +262,7 @@ function open({ typeName, rid }: { typeName: string; rid: IRoom['_id'] }) { closeOlderRooms(); } - if (CachedChatSubscription.ready.get() === true) { - if (openedRooms[typeName].active !== true) { - openedRooms[typeName].active = true; - if (computation) { - computation.invalidate(); - } - } - } - - return { - ready() { - openedRoomsDependency.depend(); - return openedRooms[typeName].ready; - }, - }; + openRoom(typeName, openedRooms[typeName]); } let openedRoom: string | undefined = undefined; @@ -259,16 +279,13 @@ export const LegacyRoomManager = { get openedRooms() { return openedRooms; }, - + listenRoomPropsByRid, + setPropertyByRid, getOpenedRoomByRid, close, closeAllRooms, - get computation() { - return computation; - }, - open, }; diff --git a/apps/meteor/client/lib/RoomManager.ts b/apps/meteor/client/lib/RoomManager.ts index f0cfd7f7973f1..a7933f50ec595 100644 --- a/apps/meteor/client/lib/RoomManager.ts +++ b/apps/meteor/client/lib/RoomManager.ts @@ -1,8 +1,9 @@ import type { IRoom } from '@rocket.chat/core-typings'; import { Emitter } from '@rocket.chat/emitter'; -import { useSyncExternalStore } from 'react'; +import { useMemo, useSyncExternalStore } from 'react'; import { getConfig } from './utils/getConfig'; +import { LegacyRoomManager } from '../../app/ui-utils/client'; import { RoomHistoryManager } from '../../app/ui-utils/client/lib/RoomHistoryManager'; const debug = !!(getConfig('debug') || getConfig('debug-RoomStore')); @@ -154,4 +155,20 @@ const subscribeOpenedSecondLevelRoom = [ export const useOpenedRoom = (): IRoom['_id'] | undefined => useSyncExternalStore(...subscribeOpenedRoom); +export const useOpenedRoomUnreadSince = (): Date | undefined => { + const rid = useOpenedRoom(); + + const { subscribe, getSnapshotValue } = useMemo(() => { + if (!rid) { + return { + subscribe: () => () => void 0, + getSnapshotValue: () => undefined, + }; + } + return LegacyRoomManager.listenRoomPropsByRid(rid, 'unreadSince'); + }, [rid]); + + return useSyncExternalStore(subscribe, getSnapshotValue); +}; + export const useSecondLevelOpenedRoom = (): IRoom['_id'] | undefined => useSyncExternalStore(...subscribeOpenedSecondLevelRoom); diff --git a/apps/meteor/client/lib/chats/readStateManager.ts b/apps/meteor/client/lib/chats/readStateManager.ts index 38314073f3af8..f3b064e5c352a 100644 --- a/apps/meteor/client/lib/chats/readStateManager.ts +++ b/apps/meteor/client/lib/chats/readStateManager.ts @@ -50,7 +50,7 @@ export class ReadStateManager extends Emitter { const firstUpdate = !this.subscription; this.subscription = subscription; - LegacyRoomManager.getOpenedRoomByRid(this.rid)?.unreadSince.set(this.subscription.ls); + LegacyRoomManager.setPropertyByRid(this.rid, 'unreadSince', this.subscription.ls); const { unread, alert } = this.subscription; if (!unread && !alert) { diff --git a/apps/meteor/client/views/room/RoomOpenerEmbedded.tsx b/apps/meteor/client/views/room/RoomOpenerEmbedded.tsx index 2dad2a0bc6a15..1b58a9d1c318f 100644 --- a/apps/meteor/client/views/room/RoomOpenerEmbedded.tsx +++ b/apps/meteor/client/views/room/RoomOpenerEmbedded.tsx @@ -63,8 +63,10 @@ const RoomOpenerEmbedded = ({ type, reference }: RoomOpenerProps): ReactElement CachedChatSubscription.upsertSubscription(mapSubscriptionFromApi(subscriptionData.subscription)); - LegacyRoomManager.computation.invalidate(); - }, [subscriptionData]); + // yes this must be done here, this is already called in useOpenRoom, but it skips subscription streams because of the subscriptions list is empty + // now that we inserted the subscription, we can open the room + LegacyRoomManager.open({ typeName: type + reference, rid: subscriptionData.subscription.rid }); + }, [subscriptionData, type, rid, reference]); useEffect(() => { if (!uid) { diff --git a/apps/meteor/client/views/room/body/hooks/useUnreadMessages.ts b/apps/meteor/client/views/room/body/hooks/useUnreadMessages.ts index ddae689fdd2a3..9ccbda0d38d75 100644 --- a/apps/meteor/client/views/room/body/hooks/useUnreadMessages.ts +++ b/apps/meteor/client/views/room/body/hooks/useUnreadMessages.ts @@ -4,9 +4,10 @@ import type { Dispatch, MutableRefObject, SetStateAction } from 'react'; import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { Messages } from '../../../../../app/models/client'; -import { LegacyRoomManager, RoomHistoryManager } from '../../../../../app/ui-utils/client'; +import { RoomHistoryManager } from '../../../../../app/ui-utils/client'; import { withDebouncing, withThrottling } from '../../../../../lib/utils/highOrderFunctions'; import { useReactiveValue } from '../../../../hooks/useReactiveValue'; +import { useOpenedRoomUnreadSince } from '../../../../lib/RoomManager'; import { roomCoordinator } from '../../../../lib/rooms/roomCoordinator'; import { setMessageJumpQueryStringParameter } from '../../../../lib/utils/setMessageJumpQueryStringParameter'; import { useChat } from '../../contexts/ChatContext'; @@ -22,7 +23,7 @@ const useUnreadMessages = (room: IRoom): readonly [data: IUnreadMessages | undef const count = useMemo(() => notLoadedCount + loadedCount, [notLoadedCount, loadedCount]); - const since = useReactiveValue(useCallback(() => LegacyRoomManager.getOpenedRoomByRid(room._id)?.unreadSince.get(), [room._id])); + const since = useOpenedRoomUnreadSince(); return useMemo(() => { if (count && since) { diff --git a/apps/meteor/client/views/room/composer/ComposerMessage.tsx b/apps/meteor/client/views/room/composer/ComposerMessage.tsx index 49500509830c5..cefe16936beeb 100644 --- a/apps/meteor/client/views/room/composer/ComposerMessage.tsx +++ b/apps/meteor/client/views/room/composer/ComposerMessage.tsx @@ -1,11 +1,10 @@ import type { IMessage, ISubscription } from '@rocket.chat/core-typings'; import { useToastMessageDispatch } from '@rocket.chat/ui-contexts'; import type { ReactElement, ReactNode } from 'react'; -import { memo, useCallback, useMemo } from 'react'; +import { memo, useMemo, useSyncExternalStore } from 'react'; import ComposerSkeleton from './ComposerSkeleton'; import { LegacyRoomManager } from '../../../../app/ui-utils/client'; -import { useReactiveValue } from '../../../hooks/useReactiveValue'; import { useChat } from '../contexts/ChatContext'; import { useRoom } from '../contexts/RoomContext'; import MessageBox from './messageBox/MessageBox'; @@ -81,9 +80,11 @@ const ComposerMessage = ({ tmid, onSend, ...props }: ComposerMessageProps): Reac [chat?.data, chat?.flows, chat?.action, chat?.composer?.text, chat?.messageEditing, dispatchToastMessage, onSend], ); - const publicationReady = useReactiveValue( - useCallback(() => LegacyRoomManager.getOpenedRoomByRid(room._id)?.streamActive ?? false, [room._id]), - ); + const { subscribe, getSnapshotValue } = useMemo(() => { + return LegacyRoomManager.listenRoomPropsByRid(room._id, 'streamActive'); + }, [room._id]); + + const publicationReady = useSyncExternalStore(subscribe, getSnapshotValue); if (!publicationReady) { return ;