Skip to content
Merged
7 changes: 7 additions & 0 deletions app/definitions/IDDPMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export interface IDDPMessage {
msg: string;
fields: {
eventName: string;
args: any;
};
}
1 change: 1 addition & 0 deletions app/definitions/ISubscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export interface ISubscription {
avatarETag?: string;
teamId?: string;
teamMain?: boolean;
unsubscribe: () => Promise<any>;
// https://nozbe.github.io/WatermelonDB/Relation.html#relation-api
messages: Relation<TMessageModel>;
threads: Relation<TThreadModel>;
Expand Down
2 changes: 1 addition & 1 deletion app/definitions/IThreadMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export interface IThreadMessage {
tmsg?: string;
msg?: string;
t?: MessageType;
rid: string;
rid?: string;
ts: string | Date;
u: IUserMessage;
alias?: string;
Expand Down
7 changes: 4 additions & 3 deletions app/lib/methods/readMessages.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import database from '../database';
import log from '../../utils/log';
import { TSubscriptionModel } from '../../definitions';
import { IRocketChat } from '../../definitions/IRocketChat';
import sdk from '../rocketchat/services/sdk';

export default async function readMessages(this: IRocketChat, rid: string, ls: Date, updateLastOpen = false): Promise<void> {
export default async function readMessages(rid: string, ls: Date, updateLastOpen = false): Promise<void> {
try {
const db = database.active;
const subscription = await db.get('subscriptions').find(rid);

// RC 0.61.0
await this.sdk.post('subscriptions.read', { rid });
// @ts-ignore
await sdk.post('subscriptions.read', { rid });

await db.write(async () => {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,41 @@ import debounce from '../../../utils/debounce';
import RocketChat from '../../rocketchat';
import { subscribeRoom, unsubscribeRoom } from '../../../actions/room';
import { Encryption } from '../../encryption';
import { IMessage, TMessageModel, TSubscriptionModel, TThreadMessageModel, TThreadModel } from '../../../definitions';
import { IDDPMessage } from '../../../definitions/IDDPMessage';

const WINDOW_TIME = 1000;

export default class RoomSubscription {
constructor(rid) {
private rid: string;
private isAlive: boolean;
private timer: null | number;
private queue: { [key: string]: IMessage };
private messagesBatch: {};
private _messagesBatch: { [key: string]: TMessageModel };
private threadsBatch: {};
private _threadsBatch: { [key: string]: TThreadModel };
private threadMessagesBatch: {};
private _threadMessagesBatch: { [key: string]: TThreadMessageModel };
private promises?: Promise<TSubscriptionModel[]>;
private connectedListener?: Promise<any>;
private disconnectedListener?: Promise<any>;
private notifyRoomListener?: Promise<any>;
private messageReceivedListener?: Promise<any>;
private lastOpen?: Date;

constructor(rid: string) {
this.rid = rid;
this.isAlive = true;
this.timer = null;
this.queue = {};
this.messagesBatch = {};
this.threadsBatch = {};
this.threadMessagesBatch = {};

this._messagesBatch = {};
this._threadsBatch = {};
this._threadMessagesBatch = {};
}

subscribe = async () => {
Expand All @@ -41,7 +64,7 @@ export default class RoomSubscription {
this.notifyRoomListener = RocketChat.onStreamData('stream-notify-room', this.handleNotifyRoomReceived);
this.messageReceivedListener = RocketChat.onStreamData('stream-room-messages', this.handleMessageReceived);
if (!this.isAlive) {
this.unsubscribe();
await this.unsubscribe();
}

reduxStore.dispatch(subscribeRoom(this.rid));
Expand Down Expand Up @@ -69,7 +92,7 @@ export default class RoomSubscription {
}
};

removeListener = async promise => {
removeListener = async (promise?: Promise<any>): Promise<void> => {
if (promise) {
try {
const listener = await promise;
Expand All @@ -85,7 +108,7 @@ export default class RoomSubscription {
RocketChat.loadMissedMessages({ rid: this.rid }).catch(e => console.log(e));
};

handleNotifyRoomReceived = protectedFunction(ddpMessage => {
handleNotifyRoomReceived = protectedFunction((ddpMessage: IDDPMessage) => {
const [_rid, ev] = ddpMessage.fields.eventName.split('/');
if (this.rid !== _rid) {
return;
Expand Down Expand Up @@ -115,9 +138,9 @@ export default class RoomSubscription {
const msgCollection = db.get('messages');
const threadsCollection = db.get('threads');
const threadMessagesCollection = db.get('thread_messages');
let deleteMessage;
let deleteThread;
let deleteThreadMessage;
let deleteMessage: TMessageModel;
let deleteThread: TThreadModel;
let deleteThreadMessage: TThreadMessageModel;

// Delete message
try {
Expand All @@ -142,7 +165,7 @@ export default class RoomSubscription {
} catch (e) {
// Do nothing
}
await db.action(async () => {
await db.write(async () => {
await db.batch(deleteMessage, deleteThread, deleteThreadMessage);
});
} catch (e) {
Expand All @@ -153,11 +176,11 @@ export default class RoomSubscription {
}
});

read = debounce(lastOpen => {
read = debounce((lastOpen: Date) => {
RocketChat.readMessages(this.rid, lastOpen);
}, 300);

updateMessage = message =>
updateMessage = (message: IMessage): Promise<void> =>
new Promise(async resolve => {
if (this.rid !== message.rid) {
return resolve();
Expand All @@ -177,15 +200,15 @@ export default class RoomSubscription {
const messageRecord = await getMessageById(message._id);
if (messageRecord) {
operation = messageRecord.prepareUpdate(
protectedFunction(m => {
protectedFunction((m: TMessageModel) => {
Object.assign(m, message);
})
);
} else {
operation = msgCollection.prepareCreate(
protectedFunction(m => {
protectedFunction((m: TMessageModel) => {
m._raw = sanitizedRaw({ id: message._id }, msgCollection.schema);
m.subscription.id = this.rid;
if (m.subscription) m.subscription.id = this.rid;
Object.assign(m, message);
})
);
Expand All @@ -202,13 +225,13 @@ export default class RoomSubscription {
const threadRecord = await getThreadById(message._id);
if (threadRecord) {
operation = threadRecord.prepareUpdate(
protectedFunction(t => {
protectedFunction((t: TThreadModel) => {
Object.assign(t, message);
})
);
} else {
operation = threadsCollection.prepareCreate(
protectedFunction(t => {
protectedFunction((t: TThreadModel) => {
t._raw = sanitizedRaw({ id: message._id }, threadsCollection.schema);
t.subscription.id = this.rid;
Object.assign(t, message);
Expand All @@ -228,18 +251,18 @@ export default class RoomSubscription {
const threadMessageRecord = await getThreadMessageById(message._id);
if (threadMessageRecord) {
operation = threadMessageRecord.prepareUpdate(
protectedFunction(tm => {
protectedFunction((tm: TThreadMessageModel) => {
Object.assign(tm, message);
tm.rid = message.tmid;
delete tm.tmid;
})
);
} else {
operation = threadMessagesCollection.prepareCreate(
protectedFunction(tm => {
protectedFunction((tm: TThreadMessageModel) => {
tm._raw = sanitizedRaw({ id: message._id }, threadMessagesCollection.schema);
Object.assign(tm, message);
tm.subscription.id = this.rid;
if (tm.subscription) tm.subscription.id = this.rid;
tm.rid = message.tmid;
delete tm.tmid;
})
Expand All @@ -254,7 +277,7 @@ export default class RoomSubscription {
return resolve();
});

handleMessageReceived = ddpMessage => {
handleMessageReceived = (ddpMessage: IDDPMessage) => {
if (!this.timer) {
this.timer = setTimeout(async () => {
// copy variables values to local and clean them
Expand All @@ -280,7 +303,7 @@ export default class RoomSubscription {

try {
const db = database.active;
await db.action(async () => {
await db.write(async () => {
await db.batch(
...Object.values(this._messagesBatch),
...Object.values(this._threadsBatch),
Expand Down